[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13098843#comment-13098843 ] Sylvain Lebresne commented on CASSANDRA-2901: - I did not spent too much time on ParallelCompactionIterable, but I suppose there wasn't much changes since last time. On the rest of the rebase, two comments: * CompactionIterable.getReduced is changed so that it will return compactedRow if empty instead of null. This is buggy because we expect the null to be filtered by 'Iterators.filter(iter, Predicates.notNull())' in CompactionManager (which it won't do anymore). CompactionsPurgeTest is failing because of that. * In CompactionIterable constructor, there is a line ending with 2 semi-colons. Apart from that, patch lgtm from a technical standpoint. There is obviously the question of its usefulness. In particular, it's unclear it will be of any help when using leveldb compaction and the current multi-threading of compaction tasks (even with SSD that is). But the diff is not very complicated (if we exclude the new ParallelCompactionIterable class) so why not. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 1.0 Attachments: 2901-0.8.txt, 2901-trunk.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13099113#comment-13099113 ] Hudson commented on CASSANDRA-2901: --- Integrated in Cassandra #1082 (See [https://builds.apache.org/job/Cassandra/1082/]) parallel compaction patch by jbellis; reviewed by slebresne for CASSANDRA-2901 jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVNview=revrev=1166255 Files : * /cassandra/trunk/CHANGES.txt * /cassandra/trunk/NEWS.txt * /cassandra/trunk/conf/cassandra.yaml * /cassandra/trunk/src/java/org/apache/cassandra/config/Config.java * /cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java * /cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java * /cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java * /cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java * /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java * /cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 1.0 Attachments: 2901-0.8.txt, 2901-trunk.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13080519#comment-13080519 ] Hudson commented on CASSANDRA-2901: --- Integrated in Cassandra #1008 (See [https://builds.apache.org/job/Cassandra/1008/]) refactor CompactionIterator - CompactionIterable patch by jbellis; reviewed by slebresne for CASSANDRA-2901 jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVNview=revrev=1154635 Files : * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java * /cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java * /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 2901-0.8.txt, 2901-trunk.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13079938#comment-13079938 ] Sylvain Lebresne commented on CASSANDRA-2901: - bq. Well, I changed the semantics of maxInMemorySize. Oups, missed that, sorry. Nevermind then. Patch lgtm, +1. I would deactivate the use of parallel compaction for the test by default before committing though, as it is not what we care the more for. About Stu remarks, the goal is to speed up a given compaction. It is clearly for people having SSDs, so the goal is imho more about having compactions done as quickly as possible (to have the more consistent possible reads in particular) rather than the catching up of compactions being behind (which CASSANDRA-2191 solves indeed). Even without triggering major compaction, you can have a minor compaction that takes time. If you have the I/O capacity and the idle cpu, why not having those get done more quickly ? As you mentioned, it is also useful for things like repair for which, let's be honest, the other efforts underway to make the tasks less like a major compaction are only idea at this point (not that they are bad idea or anything, let's not just consider them as done). For CASSANDRA-1608, I admit it is less clear how useful exactly that will be with leveled compaction. However, it is not clear that will be useless either and again, leveled compaction is not the default compaction yet. Anyway, I have to admit that it is not the ticket I care the more for, and it will likely not be useful for everyone. However it's done, the complexity is really all in ParallelCompactionIterable (the rest of this patch being mostly trivial refactorings), and that last one is completely optional, so I don't see a good reason not to add this. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13080153#comment-13080153 ] Hudson commented on CASSANDRA-2901: --- Integrated in Cassandra #1005 (See [https://builds.apache.org/job/Cassandra/1005/]) fix tracker getting out of sync with underlying data source patch by jbellis; reviewed by slebresne for CASSANDRA-2901 jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVNview=revrev=1154274 Files : * /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java * /cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java * /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java * /cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13080241#comment-13080241 ] Hudson commented on CASSANDRA-2901: --- Integrated in Cassandra-0.8 #258 (See [https://builds.apache.org/job/Cassandra-0.8/258/]) refactorings and corner-case bug fixes: - avoid modifying the List of rows after passing it to a LazilyCompactedRow - account for possibility that all data compacted by LCR has expired - clean up code duplication around shouldPurge cleanup patch by jbellis; reviewed by slebresne for CASSANDRA-2901 jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVNview=revrev=1154369 Files : * /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java * /cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java * /cassandra/branches/cassandra-0.8/CHANGES.txt * /cassandra/branches/cassandra-0.8/conf/cassandra.yaml * /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java * /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java * /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-refactor-CompactionIterator-CompactionIterable.txt, 0002-parallel-compaction.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13079497#comment-13079497 ] Sylvain Lebresne commented on CASSANDRA-2901: - Nevermind, LazilyCompactedRowTest seems broken with the patch above. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt, 0003-Fix-LCR.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13079536#comment-13079536 ] Jonathan Ellis commented on CASSANDRA-2901: --- I added some debug logging that shows that it's actually including extra columns in the first pass. [pass 1] {noformat} ... DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 16481 to serializedSize for 2ab319d0beba11e0fe8ebeead9cb [the next are bogus] DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 17075 to serializedSize for 2acf0640beba11e0fe8ebeead9cb DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 17585 to serializedSize for 2af15b50beba11e0fe8ebeead9cb DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 17596 to serializedSize for 2af8fc70beba11e0fe8ebeead9cb DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,057 LazilyCompactedRow.java (line 225) added 17493 to serializedSize for 2b0335a0beba11e0fe8ebeead9cb DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,057 LazilyCompactedRow.java (line 225) added 17493 to serializedSize for 2b200c70beba11e0fe8ebeead9cb {noformat} [pass 2] {noformat} ... DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,088 LazilyCompactedRow.java (line 225) added 16481 to serializedSize for 2ab319d0beba11e0fe8ebeead9cb {noformat} Baffling. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt, 0003-Fix-LCR.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13079590#comment-13079590 ] Jonathan Ellis commented on CASSANDRA-2901: --- Found the bug: DeserializedColumnIterator needed to create a new iter on reset(). (So, it was indeed a problem with mixed lazy/nonlazy iteration, specifically a problem with eager deserialize that only showed up when it needed to make multiple passes b/c of the presence of a lazy iterator.) new patches up. Incorporated Sylvain's improvements as well, partly in 01 and partly in 02. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13079615#comment-13079615 ] Jonathan Ellis commented on CASSANDRA-2901: --- bq. each deserializer now get the full maxInMemorySize Well, I changed the semantics of maxInMemorySize. So the constructor used outside of tests looks like {code} this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables)); {code} Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13079703#comment-13079703 ] Stu Hood commented on CASSANDRA-2901: - I like the addition of close() to AbstractCompactedRow, but... At first glance, I'm not convinced the complexity of this ticket is worth the benefits: * If compaction is falling behind during normal operation (aka, too many sstables), the multithreaded compaction from CASSANDRA-2191 should kick in appropriately, assuming you have reasonable compaction thresholds (for example, you can increase parallelism by lowering the MAX_THRESHOLD) * For repair-related compactions, it sounds like there are a few other efforts underway to make the tasks less like a major compaction * Major compactions would benefit from this ticket, but I think our endgame is that major compactions will be a thing of the past Finally, with CASSANDRA-1608, there should be more and easier available options for parallelizing compaction, since there will be more, smaller files. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13076246#comment-13076246 ] Sylvain Lebresne commented on CASSANDRA-2901: - bq. My thinking was that allowing per-deserializer buffers will keep the pipeline full better. Make sense. bq. It's not so bad, because we can assume N = 2 sstables, and we restrict each deserializer to 1/N of in-memory limit. So I think we come close to 2x overall. (And if we don't, I'd rather adjust our estimate, than make it less performant/more complex.) I still think that this pipeline is a bit too deep. I agree that each deserializer has 1/N in-memory limit, but that means that for a given row (a given key if you prefer), we have up to in_memory_limit worth of data in memory (since we have N sstables). And the number of such rows that can be in memory at a given time is: # 2 * mem_limit for each deserializer (the row in the queue and the one being deserialized) # mem_limit for each each MergeTask in memory. Given that the MergeTask executor has nb_processors threads and a nb_processors queue, this means up to 2 * nb_processors * mem_limit. # if we want to be exact, the reducer thread can also hold on MergeTask while it is blocked on submitting to the executor. That is, we can have up to a (2 * nb_processors + 3) blowup. On a 8 or 16 cores, we're far from the 2x. Now I understand the willingness to keep the pipeline full, and that in general we shouldn't be too close of this theoretical limit, but I do think that as is, it's too easy to OOM, or to force people to use a very low in-memory limit which would make this less useful than it should. I'm also no proposing to complicate things. What I would do is using direct hand-off for the merge task executor and to update the maxInMemory limit we give to each deserializer. We could do only the limit update, but we would then need to put it even lower and I'm not sure it would be a good trade-off. Other comments: * We feed NotifyingSSTableIdentityIterator to LazilyCompactedRow. However, in LCR.getEstimatedColumnCount(), we won't report the right count because NSSTII is not an instance of SSTII. Same thing in LCR.iterator, we won't call reset correctly on the wrapped SSTII (imho we should add getColumnCount and reset to the IColumnIterator interface (or make a sub-interface with those) because that 'if...instanceof' business is a bit error prone/ugly). * We could maybe say how multithreaded_compaction is different from concurrent_compactors and that multithread_compaction is likely only useful for SSDs in cassandra.yaml ? * The bytesRead race should also be fixed in CompactionIterable and the 'let's use a static final comparator' stands there too. But maybe we should fix that elsewhere. * I would have put the code in CompactedRow.close() at the end of the LCR.write() instead of adding a new method, as it avoids forgetting calling close and I don't see a good reason why close would need to be separated. * We can make PreCompactedRow.removeDeletedAndOldShards a public method and use it in PCI.MergeTask. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901-v2.txt, 2901-v3.txt, 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13078591#comment-13078591 ] Jonathan Ellis commented on CASSANDRA-2901: --- Split out some fixes to the SSTII bytes tracker getting out of sync w/ the underlying stream, and did some cleanup to make the streamed/file versions less divergent. Also adds parallel compaction testing to LazilyCompactedRowTest. CliTest and DefsTest generate compaction loads (in DefsTest's case, on the Migrations CF -- haven't dug into CliTest as much) that break w/ parallel enabled, although the test doesn't actually fail (argh). Haven't figured out what's causing that, and haven't come up with a way to reproduce in a real test yet. The DefsTest does mix lazy/nonlazy iteration in the merge, which may be relevant. bq. I'm also no proposing to complicate things. You're right, poor choice of words on my part. Latest gives the merge executor a SynchronousQueue. I think that's a better way to cut worst-case, than the Deserializer, for the reason given previously. bq. 'if...instanceof' business is a bit error prone/ugly Agreed. Added getColumnCount + reset to ICountableColumnIterator sub-interface. bq. say how multithreaded_compaction is different from concurrent_compactors and that multithread_compaction is likely only useful for SSDs in cassandra.yaml done bq. The bytesRead race should also be fixed in CompactionIterable done bq. I would have put the code in CompactedRow.close() at the end of the LCR.write() instead of adding a new method, as it avoids forgetting calling close I did consider that, but it feels weird to me to have write implicitly call close. I guess we could just change the method name? :) bq. We can make PreCompactedRow.removeDeletedAndOldShards a public method and use it in PCI.MergeTask done Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.4 Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compactions.txt Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13073477#comment-13073477 ] Sylvain Lebresne commented on CASSANDRA-2901: - Comments: * PCI.Reducer.getCompactedRow unwraps NotifyingSSTableIterators, so their close() function won't be called (as a side note, it doesn't seem like we ever call close() on the SSTableIdentityIterator). * The MergeTask executor has a bounded queue (and number of threads), so tasks can be rejected. If we want submitters to block when the queue is full and all threads are occupied, we need to reuse the trick of DebuggableThreadPoolExecutor. * Deserializer uses a queue of size 1 to queue up to 1 row while it deserialize the next one. However, we already queue up rows in the MergeTask executor, so it feels like it would be simple to use direct handoff here. It would make it easier to reason about how many rows are in memory at any given time for instance. * More generally, the memory blow up is (potentially) much more than the 2x (compared to mono-threaded) in the description of this ticket. I think that right now we may have: ** 1 for the row being deserialized ** 1 for the row in the Deserialized queue ** nbAvailProcessor's for the row in the MergeTask executor queue (each mergeTask can contain up to 'InMemoryCompactionLimit' worth of data) ** 1 for the row being merged Note that if we really want to get to the (roughly) 2x like in the description of this ticket, we need direct hand-off for both the Deserializer queue *and* the merge executor. I would be fine queuing a few tasks in the merge executor though if that can help with throughput, but I'm not even sure it will. * MergeTask calls removeDeleted and removeOldShards on the compacted cf, but it is also called in the constructor of PreCompactedRow a little bit later (we should probably remove the occurrence in PreCompactedRow as it's still multi-threaded while in the MergeTask). * In PCI.Reducer.getCompactedRow, in the case where inMemory == false, it seems we use the SSTI even for rows that were already read by the Deserializer, we should use the row instead to avoid deserializing twice. Nitpick: * In the CompactionIterable (and PCI), we create one ComparatorIColumnIterator each time instead of having a private static final one (as it is the case prior to this patch). Granted, we don't create compaction tasks quickly enough that it would really matter much, but it seems like a good habit to be nice with the GC :) * This is due to this patch, but there is a race when updating the bytesRead, such that a user could get a 0 bytesRead temporarily in the middle of a big compaction (and bytesRead should probably be volatile since it won't be read for the same thread that write it). Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901-v2.txt, 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13072691#comment-13072691 ] Yewei Zhang commented on CASSANDRA-2901: Yes, you are right. the deserialization part should be done per sstable. In the junit test, I hit the error you just described. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Assignee: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901-v2.txt, 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13072432#comment-13072432 ] Yewei Zhang commented on CASSANDRA-2901: Jonathan, Thanks for the comments. Let me look into the ReaderThread to make it multi-threaded. I don't see the reason to have two different sentinel conditions, why not just use NO_ROW in both cases? This is because the expected key is sorted. NO_KEY would possibly show up in the top when the queue has not yet been consumed. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13072482#comment-13072482 ] Yewei Zhang commented on CASSANDRA-2901: ReaderThread multithreads the merges but it looks like reading the source sstables is still single-threaded (per merge). Somehow we need to get the PrecompactedRow row.getColumnFamilyWithColumns call in its own thread. Again I like the SSTII wrapper that uses a Future to pull the data from a task on a (per-source-sstable) executor pattern here, but I'm sure there are other options. (Be careful to let LazilyCR tasks stay single-threaded, though.) mm,looking more into the implementation, the serialization is done in the constructor and is handled in getReduced() method. so the serialization is handled in multi threads. I think it is very hard for LazilyCR to be single-threaded. To make this happen, there has to be a mechanism to tell the executor to hold on other threads and let only the LazilyCR thread do the work. Maybe I am missing something here. The approach I took is to have the maximumLimit to be (max memory)/pool size. This is not ideal either since only in the worst case senario, all threads are handling that much data. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13072687#comment-13072687 ] Jonathan Ellis commented on CASSANDRA-2901: --- bq. serialization is done in the constructor and is handled in getReduced() method. so the serialization is handled in multi threads It looks to me like it's handled in the same threads that do the merging, so this is not going to saturate the CPU very well since the CPU intensive part (merging) has to wait for the i/o intensive part (deserializing). It also looks like there's a correctness bug here, in that multiple executor threads can be attempting deserialize at the same time from the same scanner, which will cause problems (SSTS/SSTII are not threadsafe). Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13072054#comment-13072054 ] Yewei Zhang commented on CASSANDRA-2901: Please help to review and see if the workflow and logic is correct. Some junit tests are broken and I am working on them. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13072156#comment-13072156 ] Jonathan Ellis commented on CASSANDRA-2901: --- Thanks, Yewei! Comments: - I think we can simplify the wait for row to be merged logic by noting that CompactionTask is itself single-threaded. So I'd have PCI.next return an AbstractCompactedRow subclass--FutureACR?--that knows how to wait for the merge to finish. Then we don't need any special logic in PCI itself, we can just pull rows-being-merged off in order and leave the blocking for the merge to finish, to CompactionTask. - ReaderThread multithreads the merges but it looks like reading the source sstables is still single-threaded (per merge). Somehow we need to get the PrecompactedRow row.getColumnFamilyWithColumns call in its own thread. Again I like the SSTII wrapper that uses a Future to pull the data from a task on a (per-source-sstable) executor pattern here, but I'm sure there are other options. (Be careful to let LazilyCR tasks stay single-threaded, though.) - I don't see the reason to have two different sentinel conditions, why not just use NO_ROW in both cases? - Note on style: better to name the things you run on executors Task (e.g. MergerTask) than Thread because MergerThread implies that it is a Thread subclass. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Fix For: 0.8.3 Attachments: 2901.patch Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13067752#comment-13067752 ] Jonathan Ellis commented on CASSANDRA-2901: --- bq. we could partially parallelize large rows Thinking about it a little more, CASSANDRA-1608 makes this unnecessary: sstables will be kept small enough that you'll have a max of one large row, per sstable. (So the multiple simultaneous sstable compactions code that we already have takes care of that.) So keeping to the plan of only parallelizing in-memory merges is fine. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Fix For: 0.8.2 Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound. This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. You'll also want a small queue size for the serialize-merged-rows executor. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13066737#comment-13066737 ] Jonathan Ellis commented on CASSANDRA-2901: --- That's an interesting idea, but the more I think about it the less convinced I am that it's an easy win. First of all, the premise that compaction is GC-intensive should be qualified: it can help cause young-gen compactions, but almost none of it will ever be promoted to old gen, which is what most people worry about. Small rows are compacted quickly enough to not be promoted, and large rows compact column-at-a-time which will also not live long enough to be promoted. (If you are seeing medium size rows get tenured, then consider reduction in_memory_compaction_limit_in_mb.) Second, it's harder than it looks to actually push compaction out to another process, because you have basically three choices: - use Runtime.exec or ProcessBuilder - use JNA and vfork - run a separate, always-on compaction daemon and communicate with it over RMI or other IPC The first of these is implemented using fork on Linux, which can cause spurious OOMs when running in an environment with overcommit disabled (which is generally accepted as best practice in a server environment). Overcommit aside, copying even just the page table for a largish heap is expensive: http://lwn.net/Articles/360509/ vfork allows avoiding copying the parent process's page table, but is obviously not completely portable so we'd have to keep in-process compaction around as a fallback option. Neither of these makes it easy to communicate back to the parent Cassandra process what cache rows should be invalidated (CASSANDRA-2305). This may be something we can live with (we did for years), but it's a regression nevertheless. The compaction daemon approach avoids the above problems but adds substantial complexity to implementation. tl;dr: you're welcome to experiment with it but I don't think it's at all clear yet that the cost/benefit is there. Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us). This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.) IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF
[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13065851#comment-13065851 ] Wojciech Meler commented on CASSANDRA-2901: --- Maybe it would be nice to spawn separate compaction process? It is quite GC-intensive operation, so maybe it make sense to separate it from server? It would also be nice to have cli tool to compact files without cassandra server for backup purpose - why not spawn such tool from server? Allow taking advantage of multiple cores while compacting a single CF - Key: CASSANDRA-2901 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 Project: Cassandra Issue Type: Improvement Components: Core Reporter: Jonathan Ellis Priority: Minor Moved from CASSANDRA-1876: There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us). This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.) IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira