keith-turner commented on PR #5497: URL: https://github.com/apache/accumulo/pull/5497#issuecomment-2824646836
This is a test that reproduces the problem. Still working on it. I discovered the frequency of flushing the batch writer is key to reproducing this after experimenting with a few things. So I may be able to adjust the test to use much less memory and just flush the batchwriter more. This current iteration of the test reproduces the problem, but would not be suitable for an IT because of all the memory it uses. ```diff diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 442b2f85f9..d88d5341b5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -263,11 +263,15 @@ public class LogSorter { @VisibleForTesting void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException { + log.debug("Writing buffer {}", buffer.size()); + long t1 = System.currentTimeMillis(); String filename = String.format("part-r-%05d.rf", part); Path path = new Path(destPath, filename); FileSystem fs = context.getVolumeManager().getFileSystemByPath(path); Path fullPath = fs.makeQualified(path); + int listCreated = 0; + // convert the LogFileKeys to Keys, sort and collect the mutations Map<Key,List<Mutation>> keyListMap = new TreeMap<>(); for (Pair<LogFileKey,LogFileValue> pair : buffer) { @@ -277,6 +281,7 @@ public class LogSorter { var list = keyListMap.putIfAbsent(k, logFileValue.mutations); if (list != null) { var muts = new ArrayList<>(list); + listCreated++; muts.addAll(logFileValue.mutations); keyListMap.put(logFileKey.toKey(), muts); } @@ -292,6 +297,8 @@ public class LogSorter { writer.append(entry.getKey(), val.toValue()); } } + long t2 = System.currentTimeMillis(); + log.debug("Wrote buffer bs:{} time:{} lc:{}", buffer.size(), (t2 - t1), listCreated); } public void startWatchingForRecoveryLogs(AbstractServer server) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java index ef14b75d81..03def6cc2e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java @@ -18,10 +18,20 @@ */ package org.apache.accumulo.test.functional; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.List; + import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.iterators.LongCombiner; +import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; @@ -39,13 +49,17 @@ public class WriteAheadLogIT extends AccumuloClusterHarness { } public static void setupConfig(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, "2M"); + cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, "1G"); cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); cfg.setProperty(Property.GC_CYCLE_START, "1"); cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s"); cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_MAXMEM, "16G"); + cfg.setProperty(Property.TSERV_WAL_SORT_BUFFER_SIZE, "2G"); hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + cfg.setMemory(ServerType.TABLET_SERVER, 8, MemoryUnit.GIGABYTE); + } @Test @@ -65,4 +79,64 @@ public class WriteAheadLogIT extends AccumuloClusterHarness { getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); VerifyIngest.verifyIngest(c, params); } + + @Test + public void testManyColumns() throws Exception { + // TODO when wal was 2M saw 200K key/vals + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String table = getUniqueNames(1)[0]; + + // disable the versioning iterator + + var ntc = new NewTableConfiguration().withoutDefaultIterators(); + IteratorSetting iter = new IteratorSetting(100, SummingCombiner.class); + SummingCombiner.setColumns(iter, List.of(new IteratorSetting.Column("family"))); + SummingCombiner.setEncodingType(iter, LongCombiner.Type.STRING); + ntc.attachIterator(iter); + c.tableOperations().create(table, ntc); + + byte[] fam = "family".getBytes(UTF_8); + byte[] qual = "qualifier".getBytes(UTF_8); + byte[] val = "1".getBytes(UTF_8); + + final int numToWrite = 20_000_000; + final int flushThreshold = 1_000; + + try (var writer = c.createBatchWriter(table)) { + for (int i = 0; i < numToWrite; i++) { + Mutation m = new Mutation("r1"); + m.put(fam, qual, i, val); + writer.addMutation(m); + if (i > 0 && i % 1_000_000 == 0) { + System.out.println("Wrote " + i); + } + if (i > 0 && i % flushThreshold == 0) { + // The more frequently this is flushed, the more write ahead log entries are created. + // Without this flush there will not be too many WAL entries. The performance problem is + // directly related to the number of WAL entries. + writer.flush(); + } + } + } + + try (var scanner = c.createScanner(table)) { + var siter = scanner.iterator(); + System.out.println(siter.next().getValue()); + } + + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + + c.tableOperations().getConfiguration(table) + .forEach((k, v) -> System.out.println("GC :" + k + "=" + v)); + c.tableOperations().getTableProperties(table) + .forEach((k, v) -> System.out.println("GTP:" + k + "=" + v)); + + System.out.println("Scanning"); + try (var scanner = c.createScanner(table)) { + var siter = scanner.iterator(); + System.out.println(siter.next().getValue()); + } + } + } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org