keith-turner commented on PR #5497:
URL: https://github.com/apache/accumulo/pull/5497#issuecomment-2824646836

   This is  a test that reproduces the problem.  Still working on it.  I 
discovered the frequency of flushing the batch writer is key to reproducing 
this after experimenting with  a few things.  So I may be able to adjust the 
test to use much less memory and just flush the batchwriter more. This current 
iteration of the test reproduces the problem, but would not be suitable for an 
IT because of all the memory it uses.
   
   ```diff
   diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
   index 442b2f85f9..d88d5341b5 100644
   --- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
   +++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
   @@ -263,11 +263,15 @@ public class LogSorter {
      @VisibleForTesting
      void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> 
buffer, int part)
          throws IOException {
   +    log.debug("Writing buffer {}", buffer.size());
   +    long t1 = System.currentTimeMillis();
        String filename = String.format("part-r-%05d.rf", part);
        Path path = new Path(destPath, filename);
        FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
        Path fullPath = fs.makeQualified(path);
    
   +    int listCreated = 0;
   +
        // convert the LogFileKeys to Keys, sort and collect the mutations
        Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
        for (Pair<LogFileKey,LogFileValue> pair : buffer) {
   @@ -277,6 +281,7 @@ public class LogSorter {
          var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
          if (list != null) {
            var muts = new ArrayList<>(list);
   +        listCreated++;
            muts.addAll(logFileValue.mutations);
            keyListMap.put(logFileKey.toKey(), muts);
          }
   @@ -292,6 +297,8 @@ public class LogSorter {
            writer.append(entry.getKey(), val.toValue());
          }
        }
   +    long t2 = System.currentTimeMillis();
   +    log.debug("Wrote buffer bs:{} time:{} lc:{}", buffer.size(), (t2 - t1), 
listCreated);
      }
    
      public void startWatchingForRecoveryLogs(AbstractServer server)
   diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
   index ef14b75d81..03def6cc2e 100644
   --- 
a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
   +++ 
b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
   @@ -18,10 +18,20 @@
     */
    package org.apache.accumulo.test.functional;
    
   +import static java.nio.charset.StandardCharsets.UTF_8;
   +
   +import java.util.List;
   +
    import org.apache.accumulo.core.client.Accumulo;
    import org.apache.accumulo.core.client.AccumuloClient;
   +import org.apache.accumulo.core.client.IteratorSetting;
   +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
    import org.apache.accumulo.core.conf.Property;
   +import org.apache.accumulo.core.data.Mutation;
   +import org.apache.accumulo.core.iterators.LongCombiner;
   +import org.apache.accumulo.core.iterators.user.SummingCombiner;
    import org.apache.accumulo.harness.AccumuloClusterHarness;
   +import org.apache.accumulo.minicluster.MemoryUnit;
    import org.apache.accumulo.minicluster.ServerType;
    import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
    import org.apache.accumulo.test.TestIngest;
   @@ -39,13 +49,17 @@ public class WriteAheadLogIT extends 
AccumuloClusterHarness {
      }
    
      public static void setupConfig(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
   -    cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, "2M");
   +    cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, "1G");
        cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
        cfg.setProperty(Property.GC_CYCLE_START, "1");
        cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s");
        cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
   +    cfg.setProperty(Property.TSERV_MAXMEM, "16G");
   +    cfg.setProperty(Property.TSERV_WAL_SORT_BUFFER_SIZE, "2G");
        hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   +    cfg.setMemory(ServerType.TABLET_SERVER, 8, MemoryUnit.GIGABYTE);
   +
      }
    
      @Test
   @@ -65,4 +79,64 @@ public class WriteAheadLogIT extends 
AccumuloClusterHarness {
        
getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
        VerifyIngest.verifyIngest(c, params);
      }
   +
   +  @Test
   +  public void testManyColumns() throws Exception {
   +    // TODO when wal was 2M saw 200K key/vals
   +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
   +      String table = getUniqueNames(1)[0];
   +
   +      // disable the versioning iterator
   +
   +      var ntc = new NewTableConfiguration().withoutDefaultIterators();
   +      IteratorSetting iter = new IteratorSetting(100, 
SummingCombiner.class);
   +      SummingCombiner.setColumns(iter, List.of(new 
IteratorSetting.Column("family")));
   +      SummingCombiner.setEncodingType(iter, LongCombiner.Type.STRING);
   +      ntc.attachIterator(iter);
   +      c.tableOperations().create(table, ntc);
   +
   +      byte[] fam = "family".getBytes(UTF_8);
   +      byte[] qual = "qualifier".getBytes(UTF_8);
   +      byte[] val = "1".getBytes(UTF_8);
   +
   +      final int numToWrite = 20_000_000;
   +      final int flushThreshold = 1_000;
   +
   +      try (var writer = c.createBatchWriter(table)) {
   +        for (int i = 0; i < numToWrite; i++) {
   +          Mutation m = new Mutation("r1");
   +          m.put(fam, qual, i, val);
   +          writer.addMutation(m);
   +          if (i > 0 && i % 1_000_000 == 0) {
   +            System.out.println("Wrote " + i);
   +          }
   +          if (i > 0 && i % flushThreshold == 0) {
   +            // The more frequently this is flushed, the more write ahead 
log entries are created.
   +            // Without this flush there will not be too many WAL entries. 
The performance problem is
   +            // directly related to the number of WAL entries.
   +            writer.flush();
   +          }
   +        }
   +      }
   +
   +      try (var scanner = c.createScanner(table)) {
   +        var siter = scanner.iterator();
   +        System.out.println(siter.next().getValue());
   +      }
   +
   +      
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
   +      
getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
   +
   +      c.tableOperations().getConfiguration(table)
   +          .forEach((k, v) -> System.out.println("GC :" + k + "=" + v));
   +      c.tableOperations().getTableProperties(table)
   +          .forEach((k, v) -> System.out.println("GTP:" + k + "=" + v));
   +
   +      System.out.println("Scanning");
   +      try (var scanner = c.createScanner(table)) {
   +        var siter = scanner.iterator();
   +        System.out.println(siter.next().getValue());
   +      }
   +    }
   +  }
    }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to