Runtian commented on code in PR #3700:
URL: https://github.com/apache/cassandra/pull/3700#discussion_r2450390369
##########
test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java:
##########
@@ -122,4 +132,53 @@ public void test_only_coordinator_chooses_index_for_query()
});
}
}
+
+ @Test
+ public void test_secondary_rebuild_with_small_memtable_memory()
+ {
+ // populate data
+ for (int i = 0 ; i < 100 ; ++i)
+ cluster.coordinator(1).execute(String.format("INSERT INTO %s (k,
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i,
generateRandomString(50000));
+
+ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ // restart node 1 with small memtable allocation so that index rebuild
will cause memtable flush which will need
+ // to reclaim the memory. see CASSANDRA-19564
+ waitOn(cluster.get(1).shutdown());
+ cluster.get(1).config().set("memtable_heap_space", "1MiB");
Review Comment:
Yeah, this is a valid concern, however, this test class starts the nodes in
the `BeforeClass` function. I think will just restore the config in the end of
the test for the first node.
##########
test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java:
##########
@@ -23,12 +23,20 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
Review Comment:
updated
##########
test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java:
##########
@@ -122,4 +132,53 @@ public void test_only_coordinator_chooses_index_for_query()
});
}
}
+
+ @Test
+ public void test_secondary_rebuild_with_small_memtable_memory()
+ {
+ // populate data
+ for (int i = 0 ; i < 100 ; ++i)
+ cluster.coordinator(1).execute(String.format("INSERT INTO %s (k,
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i,
generateRandomString(50000));
Review Comment:
added a function to support random string with length.
##########
test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java:
##########
@@ -122,4 +132,53 @@ public void test_only_coordinator_chooses_index_for_query()
});
}
}
+
+ @Test
+ public void test_secondary_rebuild_with_small_memtable_memory()
+ {
+ // populate data
+ for (int i = 0 ; i < 100 ; ++i)
+ cluster.coordinator(1).execute(String.format("INSERT INTO %s (k,
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i,
generateRandomString(50000));
+
+ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ // restart node 1 with small memtable allocation so that index rebuild
will cause memtable flush which will need
+ // to reclaim the memory. see CASSANDRA-19564
+ waitOn(cluster.get(1).shutdown());
+ cluster.get(1).config().set("memtable_heap_space", "1MiB");
+ cluster.get(1).startup();
+ String tableNameWithoutKeyspaceName = tableName.split("\\.")[1];
+ String indexName = String.format("v_index_%d", seq.get());
+ Runnable task = cluster.get(1).runsOnInstance(
+ () -> {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableNameWithoutKeyspaceName);
+
cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(indexName)));
+ }
+ );
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ Future future = es.submit(task);
Review Comment:
updated
##########
src/java/org/apache/cassandra/index/SecondaryIndexManager.java:
##########
@@ -33,6 +33,8 @@
import com.google.common.collect.*;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
Review Comment:
updated
##########
src/java/org/apache/cassandra/index/SecondaryIndexManager.java:
##########
@@ -918,14 +920,21 @@ public void indexPartition(DecoratedKey key, Set<Index>
indexes, int pageSize, R
SinglePartitionPager pager = new SinglePartitionPager(cmd, null,
ProtocolVersion.CURRENT);
while (!pager.isExhausted())
{
+ UnfilteredRowIterator partition;
try (ReadExecutionController controller =
cmd.executionController();
- WriteContext ctx =
keyspace.getWriteHandler().createContextForIndexing();
UnfilteredPartitionIterator page =
pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
{
if (!page.hasNext())
break;
- try (UnfilteredRowIterator partition = page.next())
+ try (UnfilteredRowIterator onePartition = page.next())
+ {
+ partition =
ImmutableBTreePartition.create(onePartition).unfilteredIterator();
+ }
+ }
+
+ try (WriteContext ctx =
keyspace.getWriteHandler().createContextForIndexing())
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]