revert CASSANDRA-1337 comprising commits ef23335, f17fbac, 9cf915f.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29fed1f1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29fed1f1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29fed1f1 Branch: refs/heads/trunk Commit: 29fed1f18188cfcd71c817db394c1087e0698dbd Parents: fe784f5 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Aug 16 15:42:30 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Aug 16 15:42:30 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/service/StorageProxy.java | 60 ++++---------- 1 files changed, 17 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/29fed1f1/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8d0e0b3..9d55739 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -853,23 +853,6 @@ public class StorageProxy implements StorageProxyMBean int columnsCount = 0; rows = new ArrayList<Row>(); List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range); - - // get the cardinality of this index based on row count - // use this info to decide how many scans to do in parallel - Table table = Table.open(command.keyspace); - long estimatedKeysPerRange = table.getColumnFamilyStore(command.column_family) - .estimateKeys() / table.getReplicationStrategy().getReplicationFactor(); - - int concurrencyFactor = (int) (command.maxResults / (estimatedKeysPerRange + 1)); - if (concurrencyFactor <= 0 || command.maxIsColumns) - concurrencyFactor = 1; - else if (concurrencyFactor > ranges.size()) - concurrencyFactor = ranges.size(); - - // parallel scan handlers - List<ReadCallback<RangeSliceReply, Iterable<Row>>> scanHandlers = new ArrayList<ReadCallback<RangeSliceReply, Iterable<Row>>>(concurrencyFactor); - - int parallelHandlers = concurrencyFactor; for (AbstractBounds<RowPosition> range : ranges) { RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace, @@ -904,7 +887,6 @@ public class StorageProxy implements StorageProxyMBean { throw new AssertionError(e); } - parallelHandlers--; } else { @@ -921,36 +903,28 @@ public class StorageProxy implements StorageProxyMBean logger.debug("reading " + nodeCmd + " from " + endpoint); } - scanHandlers.add(handler); - - if (scanHandlers.size() >= parallelHandlers) + try { - for (ReadCallback<RangeSliceReply, Iterable<Row>> scanHandler : scanHandlers) + for (Row row : handler.get()) { - try - { - for (Row row : scanHandler.get()) - { - rows.add(row); - columnsCount += row.getLiveColumnCount(); - logger.debug("range slices read {}", row.key); - } - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRangeRpcTimeout()); - } - catch (TimeoutException ex) - { - if (logger.isDebugEnabled()) - logger.debug("Range slice timeout: {}", ex.toString()); - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } + rows.add(row); + columnsCount += row.getLiveColumnCount(); + logger.debug("range slices read {}", row.key); } - scanHandlers.clear(); //go back for more + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); + } + catch (TimeoutException ex) + { + if (logger.isDebugEnabled()) + logger.debug("Range slice timeout: {}", ex.toString()); + throw ex; + } + catch (DigestMismatchException e) + { + throw new AssertionError(e); // no digests in range slices yet } } + // if we're done, great, otherwise, move to the next range int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); if (count >= nodeCmd.maxResults)