HBASE-19358 Improve the stability of splitting log when do fail over Signed-off-by: Yu Li <l...@apache.org>
Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4f46f53 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4f46f53 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4f46f53 Branch: refs/heads/branch-1.3 Commit: e4f46f53b737d0a5ea0c4c079b6182a5d918e91d Parents: cd1726f Author: Jingyun Tian <tianjy1...@gmail.com> Authored: Tue Jan 2 17:21:32 2018 +0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 18:08:17 2018 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/wal/WALSplitter.java | 358 +++++++++++++------ .../TestWALReplayBoundedLogWriterCreation.java | 33 ++ .../TestWALSplitBoundedLogWriterCreation.java | 44 +++ 3 files changed, 330 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e4f46f53/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index cc065e5..1927eb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -25,6 +25,7 @@ import java.io.InterruptedIOException; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -161,6 +162,9 @@ public class WALSplitter { protected boolean distributedLogReplay; + private final boolean splitWriterCreationBounded; + + // Map encodedRegionName -> lastFlushedSequenceId protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>(); @@ -180,6 +184,8 @@ public class WALSplitter { // the file being split currently private FileStatus fileBeingSplit; + public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; + @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, @@ -194,10 +200,10 @@ public class WALSplitter { this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; this.controller = new PipelineController(); - + this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", - 128*1024*1024)); + 128*1024*1024), splitWriterCreationBounded); // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them @@ -212,7 +218,12 @@ public class WALSplitter { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; - outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); + if(splitWriterCreationBounded){ + outputSink = new BoundedLogWriterCreationOutputSink(controller, + entryBuffers, numWriterThreads); + }else { + outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); + } } } @@ -923,11 +934,19 @@ public class WALSplitter { Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); long totalBuffered = 0; - long maxHeapUsage; + final long maxHeapUsage; + boolean splitWriterCreationBounded; + public EntryBuffers(PipelineController controller, long maxHeapUsage) { + this(controller, maxHeapUsage, false); + } + + public EntryBuffers(PipelineController controller, long maxHeapUsage, + boolean splitWriterCreationBounded) { this.controller = controller; this.maxHeapUsage = maxHeapUsage; + this.splitWriterCreationBounded = splitWriterCreationBounded; } /** @@ -967,6 +986,14 @@ public class WALSplitter { * @return RegionEntryBuffer a buffer of edits to be written or replayed. */ synchronized RegionEntryBuffer getChunkToWrite() { + // The core part of limiting opening writers is it doesn't return chunk only if the heap size + // is over maxHeapUsage. Thus it doesn't need to create a writer for each region + // during splitting. It will flush all the logs in the buffer after splitting through a + // threadpool, which means the number of writers it created is under control + if(splitWriterCreationBounded && totalBuffered < maxHeapUsage){ + return null; + } + long biggestSize = 0; byte[] biggestBufferKey = null; @@ -1145,11 +1172,9 @@ public class WALSplitter { protected PipelineController controller; protected EntryBuffers entryBuffers; - protected Map<byte[], SinkWriter> writers = Collections - .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));; - - protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections - .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR)); + protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>(); + protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum = + new ConcurrentHashMap<>(); protected final List<WriterThread> writerThreads = Lists.newArrayList(); @@ -1195,18 +1220,19 @@ public class WALSplitter { * Update region's maximum edit log SeqNum. */ void updateRegionMaximumEditLogSeqNum(Entry entry) { + synchronized (regionMaximumEditLogSeqNum) { - Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey() - .getEncodedRegionName()); + String encodedRegionName = Bytes.toString(entry.getKey().getEncodedRegionName()); + Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(encodedRegionName); if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) { - regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey() + regionMaximumEditLogSeqNum.put(encodedRegionName, entry.getKey() .getLogSeqNum()); } } } Long getRegionMaximumEditLogSeqNum(byte[] region) { - return regionMaximumEditLogSeqNum.get(region); + return regionMaximumEditLogSeqNum.get(Bytes.toString(region)); } /** @@ -1328,7 +1354,7 @@ public class WALSplitter { } // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { + void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; try (WAL.Reader reader = walFactory.createReader(fs, dst)) { WAL.Entry entry = reader.next(); @@ -1364,7 +1390,7 @@ public class WALSplitter { * Close all of the output streams. * @return the list of paths written. */ - private List<Path> close() throws IOException { + List<Path> close() throws IOException { Preconditions.checkState(!closeAndCleanCompleted); final List<Path> paths = new ArrayList<Path>(); @@ -1381,71 +1407,9 @@ public class WALSplitter { }); CompletionService<Void> completionService = new ExecutorCompletionService<Void>(closeThreadPool); - for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); - } - completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); - try { - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - return null; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten - + " edits, skipped " + wap.editsSkipped + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms"); - } - if (wap.editsWritten == 0) { - // just remove the empty recovered.edits file - if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { - LOG.warn("Failed deleting empty " + wap.p); - throw new IOException("Failed deleting empty " + wap.p); - } - return null; - } - - Path dst = getCompletedRecoveredEditsFilePath(wap.p, - regionMaximumEditLogSeqNum.get(writersEntry.getKey())); - try { - if (!dst.equals(wap.p) && fs.exists(dst)) { - deleteOneWithFewerEntries(wap, dst); - } - // Skip the unit tests which create a splitter that reads and - // writes the data without touching disk. - // TestHLogSplit#testThreading is an example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); - } - LOG.info("Rename " + wap.p + " to " + dst); - } - } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); - thrown.add(ioe); - return null; - } - paths.add(dst); - return null; - } - }); - } - - boolean progress_failed = false; - try { - for (int i = 0, n = this.writers.size(); i < n; i++) { - Future<Void> future = completionService.take(); - future.get(); - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - } + boolean progress_failed; + try{ + progress_failed = executeCloseTask(completionService, thrown, paths); } catch (InterruptedException e) { IOException iie = new InterruptedIOException(); iie.initCause(e); @@ -1467,6 +1431,83 @@ public class WALSplitter { return paths; } + boolean executeCloseTask(CompletionService<Void> completionService, + final List<IOException> thrown, final List<Path> paths) + throws InterruptedException, ExecutionException { + for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p); + } + completionService.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); + Path dst = closeWriter(writersEntry.getKey(), wap, thrown); + paths.add(dst); + return null; + } + }); + } + boolean progress_failed = false; + for (int i = 0, n = this.writers.size(); i < n; i++) { + Future<Void> future = completionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + return progress_failed; + } + + Path closeWriter(String encodedRegionName, WriterAndPath wap, List<IOException> thrown) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Closing " + wap.p); + } + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten + + " edits, skipped " + wap.editsSkipped + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms"); + } + if (wap.editsWritten == 0) { + // just remove the empty recovered.edits file + if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { + LOG.warn("Failed deleting empty " + wap.p); + throw new IOException("Failed deleting empty " + wap.p); + } + return null; + } + + Path dst = getCompletedRecoveredEditsFilePath(wap.p, + regionMaximumEditLogSeqNum.get(encodedRegionName)); + try { + if (!dst.equals(wap.p) && fs.exists(dst)) { + deleteOneWithFewerEntries(wap, dst); + } + // Skip the unit tests which create a splitter that reads and + // writes the data without touching disk. + // TestHLogSplit#testThreading is an example. + if (fs.exists(wap.p)) { + if (!fs.rename(wap.p, dst)) { + throw new IOException("Failed renaming " + wap.p + " to " + dst); + } + LOG.info("Rename " + wap.p + " to " + dst); + } + } catch (IOException ioe) { + LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + thrown.add(ioe); + return null; + } + return dst; + } + + private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException { if (writersClosed) { return thrown; @@ -1490,20 +1531,19 @@ public class WALSplitter { } } } finally { - synchronized (writers) { - WriterAndPath wap = null; - for (SinkWriter tmpWAP : writers.values()) { - try { - wap = (WriterAndPath) tmpWAP; - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - continue; - } - LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms)"); + WriterAndPath wap = null; + for (SinkWriter tmpWAP : writers.values()) { + try { + wap = (WriterAndPath) tmpWAP; + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + continue; } + LOG.info( + "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent + / 1000 / 1000) + "ms)"); } writersClosed = true; } @@ -1516,9 +1556,10 @@ public class WALSplitter { * long as multiple threads are always acting on different regions. * @return null if this region shouldn't output any logs */ - private WriterAndPath getWriterAndPath(Entry entry) throws IOException { + WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); - WriterAndPath ret = (WriterAndPath) writers.get(region); + String regionName = Bytes.toString(region); + WriterAndPath ret = (WriterAndPath) writers.get(regionName); if (ret != null) { return ret; } @@ -1532,14 +1573,17 @@ public class WALSplitter { blacklistedRegions.add(region); return null; } - writers.put(region, ret); + + if(reusable) { + writers.put(regionName, ret); + } return ret; } /** * @return a path with a write for that path. caller should close. */ - private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { + WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName()); if (regionedits == null) { @@ -1558,7 +1602,7 @@ public class WALSplitter { return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum()); } - private void filterCellByStore(Entry logEntry) { + void filterCellByStore(Entry logEntry) { Map<byte[], Long> maxSeqIdInStores = regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { @@ -1589,12 +1633,16 @@ public class WALSplitter { @Override public void append(RegionEntryBuffer buffer) throws IOException { + appendBuffer(buffer, true); + } + + + WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ List<Entry> entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); - return; + return null; } - WriterAndPath wap = null; long startTime = System.nanoTime(); @@ -1603,12 +1651,12 @@ public class WALSplitter { for (Entry logEntry : entries) { if (wap == null) { - wap = getWriterAndPath(logEntry); + wap = getWriterAndPath(logEntry, reusable); if (wap == null) { if (LOG.isDebugEnabled()) { LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry); } - return; + return null; } } filterCellByStore(logEntry); @@ -1628,6 +1676,7 @@ public class WALSplitter { LOG.fatal(" Got while writing log entry to log", e); throw e; } + return wap; } @Override @@ -1648,8 +1697,8 @@ public class WALSplitter { public Map<byte[], Long> getOutputCounts() { TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); synchronized (writers) { - for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) { - ret.put(entry.getKey(), entry.getValue().editsWritten); + for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) { + ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); } } return ret; @@ -1662,6 +1711,105 @@ public class WALSplitter { } /** + * Class that will limit the number of hdfs writers we create to split the logs + */ + class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink{ + + ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>(); + + + public BoundedLogWriterCreationOutputSink(PipelineController controller, + EntryBuffers entryBuffers, int numWriters){ + super(controller, entryBuffers, numWriters); + } + + @Override + public List<Path> finishWritingAndClose() throws IOException { + boolean isSuccessful; + List<Path> result; + try { + isSuccessful = finishWriting(false); + } finally { + result = close(); + } + if (isSuccessful) { + splits = result; + } + return splits; + } + + @Override + boolean executeCloseTask(CompletionService<Void> closeCompletionService, + final List<IOException> thrown, final List<Path> paths) + throws InterruptedException, ExecutionException { + for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) { + LOG.info("Submitting write then close of " + + Bytes.toString(buffer.getValue().encodedRegionName)); + closeCompletionService.submit(new Callable<Void>() { + public Void call() throws Exception { + Path dst = writeThenClose(buffer.getValue()); + paths.add(dst); + return null; + } + }); + } + + boolean progress_failed = false; + for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { + Future<Void> future = closeCompletionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + return progress_failed; + } + + @Override + public Map<byte[], Long> getOutputCounts() { + Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>(); + for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){ + regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); + } + return regionRecoverStatMapResult; + } + + @Override + public int getNumberOfRecoveredRegions() { + return regionRecoverStatMap.size(); + } + + @Override + public void append(RegionEntryBuffer buffer) throws IOException { + writeThenClose(buffer); + } + + private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { + WriterAndPath wap = appendBuffer(buffer, false); + Path dst = null; + if(wap != null){ + String encodedRegionName = Bytes.toString(buffer.encodedRegionName); + Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); + if(value != null){ + Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; + regionRecoverStatMap.put(encodedRegionName, newValue); + } + } + + List<IOException> thrown = new ArrayList<>(); + if(wap != null) { + dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); + } + + if(!thrown.isEmpty()){ + throw MultipleIOException.createIOException(thrown); + } + return dst; + } + } + + + /** * Class wraps the actual writer which writes data out and related statistics */ public abstract static class SinkWriter { @@ -1720,7 +1868,7 @@ public class WALSplitter { .synchronizedMap(new TreeMap<TableName, HConnection>()); /** * Map key -> value layout - * <servername>:<table name> -> Queue<Row> + * servername:table name -> Queue(Row) */ private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap = new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e4f46f53/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java new file mode 100644 index 0000000..36bc63a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALReplay.setUpBeforeClass(); + TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e4f46f53/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java new file mode 100644 index 0000000..285e8f3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALSplit.setUpBeforeClass(); + TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true); + } + + /** + * The logic of this test is conflict with the limit writers split logic, skip this test + */ + @Test(timeout=300000) + @Ignore + public void testThreadingSlowWriterSmallBuffer() throws Exception { + super.testThreadingSlowWriterSmallBuffer(); + } +} +