ivakegg closed pull request #356: ACCUMULO-4777 Removed the unused sequence
generator.
URL: https://github.com/apache/accumulo/pull/356
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 08232234db..26762fbd85 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -263,13 +263,13 @@
"The maximum size for each write-ahead log. See comment for property
tserver.memory.maps.max"),
TSERV_WALOG_MAX_AGE("tserver.walog.max.age", "24h",
PropertyType.TIMEDURATION, "The maximum age for each write-ahead log."),
TSERV_WALOG_TOLERATED_CREATION_FAILURES("tserver.walog.tolerated.creation.failures",
"50", PropertyType.COUNT,
- "The maximum number of failures tolerated when creating a new WAL file
within the period specified by tserver.walog.failures.period."
- + " Exceeding this number of failures in the period causes the
TabletServer to exit."),
+ "The maximum number of failures tolerated when creating a new WAL file.
Values < 0 will allow unlimited creation failures."
+ + " Exceeding this number of failures consecutively trying to create
a new WAL causes the TabletServer to exit."),
TSERV_WALOG_TOLERATED_WAIT_INCREMENT("tserver.walog.tolerated.wait.increment",
"1000ms", PropertyType.TIMEDURATION,
- "The amount of time to wait between failures to create a WALog."),
+ "The amount of time to wait between failures to create or write a
WALog."),
// Never wait longer than 5 mins for a retry
TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION("tserver.walog.maximum.wait.duration",
"5m", PropertyType.TIMEDURATION,
- "The maximum amount of time to wait after a failure to create a WAL
file."),
+ "The maximum amount of time to wait after a failure to create or write a
WAL file."),
TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s",
PropertyType.TIMEDURATION,
"Time a tablet server will sleep between checking which tablets need
compaction."),
TSERV_MAJC_THREAD_MAXOPEN("tserver.compaction.major.thread.files.open.max",
"10", PropertyType.COUNT,
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
index e84b1af325..1f55d72fe6 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
@@ -25,12 +25,14 @@
public class Retry {
private static final Logger log = LoggerFactory.getLogger(Retry.class);
+ public static final long MAX_RETRY_DISABLED = -1;
+
private long maxRetries, maxWait, waitIncrement;
private long retriesDone, currentWait;
/**
* @param maxRetries
- * Maximum times to retry
+ * Maximum times to retry or MAX_RETRY_DISABLED if no maximum
* @param startWait
* The amount of time (ms) to wait for the initial retry
* @param maxWait
@@ -46,6 +48,18 @@ public Retry(long maxRetries, long startWait, long
waitIncrement, long maxWait)
this.currentWait = startWait;
}
+ /**
+ * @param startWait
+ * The amount of time (ms) to wait for the initial retry
+ * @param maxWait
+ * The maximum wait (ms)
+ * @param waitIncrement
+ * The amount of time (ms) to increment next wait time by
+ */
+ public Retry(long startWait, long waitIncrement, long maxWait) {
+ this(MAX_RETRY_DISABLED, startWait, waitIncrement, maxWait);
+ }
+
// Visible for testing
long getMaxRetries() {
return maxRetries;
@@ -86,8 +100,12 @@ void setMaxWait(long maxWait) {
this.maxWait = maxWait;
}
+ public boolean isMaxRetryDisabled() {
+ return maxRetries < 0;
+ }
+
public boolean canRetry() {
- return retriesDone < maxRetries;
+ return isMaxRetryDisabled() || (retriesDone < maxRetries);
}
public void useRetry() {
diff --git
a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
index 63a1241700..aa6da20fa8 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
@@ -25,6 +25,18 @@
private final long maxRetries, startWait, maxWait, waitIncrement;
+ /**
+ * Create a retry factor for retries with a limit
+ *
+ * @param maxRetries
+ * The maximum number of retries
+ * @param startWait
+ * The wait ms for the first retry
+ * @param waitIncrement
+ * The amount of ms to increment the wait on subsequent retries
+ * @param maxWait
+ * The max amount of wait time between retries
+ */
public RetryFactory(long maxRetries, long startWait, long waitIncrement,
long maxWait) {
this.maxRetries = maxRetries;
this.startWait = startWait;
@@ -32,6 +44,23 @@ public RetryFactory(long maxRetries, long startWait, long
waitIncrement, long ma
this.waitIncrement = waitIncrement;
}
+ /**
+ * Create a retry factory for retries that have no limit
+ *
+ * @param startWait
+ * The wait ms for the first retry
+ * @param waitIncrement
+ * The amount of ms to increment the wait on subsequent retries
+ * @param maxWait
+ * The max amount of wait time between retries
+ */
+ public RetryFactory(long startWait, long waitIncrement, long maxWait) {
+ this.maxRetries = Retry.MAX_RETRY_DISABLED;
+ this.startWait = startWait;
+ this.maxWait = maxWait;
+ this.waitIncrement = waitIncrement;
+ }
+
public Retry create() {
return new Retry(maxRetries, startWait, waitIncrement, maxWait);
}
diff --git
a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
index cb3d608f1d..9ba19a4e67 100644
---
a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
+++
b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
@@ -36,4 +36,16 @@ public void properArgumentsInRetry() {
Assert.assertEquals(waitIncrement, retry.getWaitIncrement());
}
+ @Test
+ public void properArgumentsInUnlimitedRetry() {
+ long startWait = 50l, maxWait = 5000l, waitIncrement = 500l;
+ RetryFactory factory = new RetryFactory(startWait, waitIncrement, maxWait);
+ Retry retry = factory.create();
+
+ Assert.assertEquals(Retry.MAX_RETRY_DISABLED, retry.getMaxRetries());
+ Assert.assertEquals(startWait, retry.getCurrentWait());
+ Assert.assertEquals(maxWait, retry.getMaxWait());
+ Assert.assertEquals(waitIncrement, retry.getWaitIncrement());
+ }
+
}
diff --git
a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
index e37af015e4..6bbd1ffb80 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
@@ -28,10 +28,14 @@
private Retry retry;
long initialWait = 1000l, waitIncrement = 1000l, maxRetries = 5;
+ private Retry unlimitedRetry1;
+ private Retry unlimitedRetry2;
@Before
public void setup() {
retry = new Retry(maxRetries, initialWait, waitIncrement, maxRetries *
1000l);
+ unlimitedRetry1 = new Retry(initialWait, waitIncrement, maxRetries *
1000l);
+ unlimitedRetry2 = new Retry(-10, initialWait, waitIncrement, maxRetries *
1000l);
}
@Test
@@ -124,4 +128,23 @@ public void testBoundedWaitIncrement() throws
InterruptedException {
EasyMock.verify(retry);
}
+
+ @Test
+ public void testIsMaxRetryDisabled() {
+ Assert.assertFalse(retry.isMaxRetryDisabled());
+ Assert.assertTrue(unlimitedRetry1.isMaxRetryDisabled());
+ Assert.assertTrue(unlimitedRetry2.isMaxRetryDisabled());
+ Assert.assertEquals(Retry.MAX_RETRY_DISABLED,
unlimitedRetry1.getMaxRetries());
+ Assert.assertEquals(-10, unlimitedRetry2.getMaxRetries());
+ }
+
+ @Test
+ public void testUnlimitedRetry() {
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ Assert.assertTrue(unlimitedRetry1.canRetry());
+ unlimitedRetry1.useRetry();
+ Assert.assertTrue(unlimitedRetry2.canRetry());
+ unlimitedRetry2.useRetry();
+ }
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 0f28601fb4..3f56def9f7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -372,14 +372,16 @@ public void run() {
+ minBlockSize + ". Either increase the " +
Property.TSERV_WALOG_MAX_SIZE + " or decrease
dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
final long toleratedWalCreationFailures =
aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
- final long walCreationFailureRetryIncrement =
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
- final long walCreationFailureRetryMax =
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
- // Tolerate `toleratedWalCreationFailures` failures, waiting
`walCreationFailureRetryIncrement` milliseconds after the first failure,
- // incrementing the next wait period by the same value, for a maximum of
`walCreationFailureRetryMax` retries.
- final RetryFactory walCreationRetryFactory = new
RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement,
- walCreationFailureRetryIncrement, walCreationFailureRetryMax);
-
- logger = new TabletServerLogger(this, walogMaxSize, syncCounter,
flushCounter, walCreationRetryFactory, walogMaxAge);
+ final long walFailureRetryIncrement =
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
+ final long walFailureRetryMax =
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
+ // Tolerate `toleratedWalCreationFailures` failures, waiting
`walFailureRetryIncrement` milliseconds after the first failure,
+ // incrementing the next wait period by the same value, for a maximum of
`walFailureRetryMax` retries.
+ final RetryFactory walCreationRetryFactory = new
RetryFactory(toleratedWalCreationFailures, walFailureRetryIncrement,
walFailureRetryIncrement,
+ walFailureRetryMax);
+ // Tolerate infinite failures for the write, however backing off the same
as for creation failures.
+ final RetryFactory walWritingRetryFactory = new
RetryFactory(walFailureRetryIncrement, walFailureRetryIncrement,
walFailureRetryMax);
+
+ logger = new TabletServerLogger(this, walogMaxSize, syncCounter,
flushCounter, walCreationRetryFactory, walWritingRetryFactory, walogMaxAge);
this.resourceManager = new TabletServerResourceManager(this, fs);
this.security = AuditedSecurityOperation.getInstance(this);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c1fd6bb43f..da3ef0b23d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -16,8 +16,6 @@
*/
package org.apache.accumulo.tserver.log;
-import static
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -95,15 +93,15 @@
// Use a ReadWriteLock to allow multiple threads to use the log set, but
obtain a write lock to change them
private final ReentrantReadWriteLock logIdLock = new
ReentrantReadWriteLock();
- private final AtomicInteger seqGen = new AtomicInteger();
-
private final AtomicLong syncCounter;
private final AtomicLong flushCounter;
private long createTime = 0;
- private final RetryFactory retryFactory;
- private Retry retry = null;
+ private final RetryFactory createRetryFactory;
+ private Retry createRetry = null;
+
+ private final RetryFactory writeRetryFactory;
static private abstract class TestCallWithWriteLock {
abstract boolean test();
@@ -148,13 +146,15 @@ private static void testLockAndRun(final ReadWriteLock
rwlock, TestCallWithWrite
}
}
- public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong
syncCounter, AtomicLong flushCounter, RetryFactory retryFactory, long maxAge) {
+ public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong
syncCounter, AtomicLong flushCounter, RetryFactory createRetryFactory,
+ RetryFactory writeRetryFactory, long maxAge) {
this.tserver = tserver;
this.maxSize = maxSize;
this.syncCounter = syncCounter;
this.flushCounter = flushCounter;
- this.retryFactory = retryFactory;
- this.retry = null;
+ this.createRetryFactory = createRetryFactory;
+ this.createRetry = null;
+ this.writeRetryFactory = writeRetryFactory;
this.maxAge = maxAge;
}
@@ -224,8 +224,8 @@ synchronized private void createLogger() throws IOException
{
log.info("Using next log " + currentLog.getFileName());
// When we successfully create a WAL, make sure to reset the Retry.
- if (null != retry) {
- retry = null;
+ if (null != createRetry) {
+ createRetry = null;
}
this.createTime = System.currentTimeMillis();
@@ -234,18 +234,18 @@ synchronized private void createLogger() throws
IOException {
throw new RuntimeException("Error: unexpected type seen: " + next);
}
} catch (Exception t) {
- if (null == retry) {
- retry = retryFactory.create();
+ if (null == createRetry) {
+ createRetry = createRetryFactory.create();
}
// We have more retries or we exceeded the maximum number of accepted
failures
- if (retry.canRetry()) {
- // Use the retry and record the time in which we did so
- retry.useRetry();
+ if (createRetry.canRetry()) {
+ // Use the createRetry and record the time in which we did so
+ createRetry.useRetry();
try {
// Backoff
- retry.waitForNextAttempt();
+ createRetry.waitForNextAttempt();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
@@ -348,20 +348,26 @@ synchronized private void close() throws IOException {
}
interface Writer {
- LoggerOperation write(DfsLogger logger, int seq) throws Exception;
+ LoggerOperation write(DfsLogger logger) throws Exception;
}
- private int write(CommitSession commitSession, boolean mincFinish, Writer
writer) throws IOException {
+ private void write(CommitSession commitSession, boolean mincFinish, Writer
writer) throws IOException {
+ write(commitSession, mincFinish, writer, writeRetryFactory.create());
+ }
+
+ private void write(CommitSession commitSession, boolean mincFinish, Writer
writer, Retry writeRetry) throws IOException {
List<CommitSession> sessions = Collections.singletonList(commitSession);
- return write(sessions, mincFinish, writer);
+ write(sessions, mincFinish, writer, writeRetry);
}
- private int write(final Collection<CommitSession> sessions, boolean
mincFinish, Writer writer) throws IOException {
+ private void write(final Collection<CommitSession> sessions, boolean
mincFinish, Writer writer) throws IOException {
+ write(sessions, mincFinish, writer, writeRetryFactory.create());
+ }
+
+ private void write(final Collection<CommitSession> sessions, boolean
mincFinish, Writer writer, Retry writeRetry) throws IOException {
// Work very hard not to lock this during calls to the outside world
int currentLogId = logId.get();
- int seq = -1;
- int attempt = 1;
boolean success = false;
while (!success) {
try {
@@ -379,7 +385,7 @@ private int write(final Collection<CommitSession> sessions,
boolean mincFinish,
if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
try {
// Scribble out a tablet definition and then write to the
metadata table
- defineTablet(commitSession);
+ defineTablet(commitSession, writeRetry);
} finally {
commitSession.finishUpdatingLogsUsed();
}
@@ -400,24 +406,26 @@ private int write(final Collection<CommitSession>
sessions, boolean mincFinish,
if (currentLogId == logId.get()) {
// write the mutation to the logs
- seq = seqGen.incrementAndGet();
- if (seq < 0)
- throw new RuntimeException("Logger sequence generator wrapped!
Onos!!!11!eleven");
- LoggerOperation lop = writer.write(copy, seq);
+ LoggerOperation lop = writer.write(copy);
lop.await();
// double-check: did the log set change?
success = (currentLogId == logId.get());
}
} catch (DfsLogger.LogClosedException ex) {
- log.debug("Logs closed while writing, retrying " + attempt);
+ log.debug("Logs closed while writing, retrying attempt " +
writeRetry.retriesCompleted());
} catch (Exception t) {
- if (attempt != 1) {
- log.error("Unexpected error writing to log, retrying attempt " +
attempt, t);
+ log.warn("Failed to write to WAL, retrying attempt " +
writeRetry.retriesCompleted(), t);
+
+ try {
+ // Backoff
+ writeRetry.waitForNextAttempt();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
- sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- attempt++;
+ writeRetry.useRetry();
}
// Some sort of write failure occurred. Grab the write lock and reset
the logs.
// But since multiple threads will attempt it, only attempt the reset
when
@@ -453,42 +461,40 @@ void withWriteLock() throws IOException {
closeForReplication(sessions);
}
});
- return seq;
}
protected void closeForReplication(Collection<CommitSession> sessions) {
// TODO We can close the WAL here for replication purposes
}
- public int defineTablet(final CommitSession commitSession) throws
IOException {
+ public void defineTablet(final CommitSession commitSession, final Retry
writeRetry) throws IOException {
// scribble this into the metadata tablet, too.
- return write(commitSession, false, new Writer() {
+ write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(DfsLogger logger, int ignored) throws
Exception {
+ public LoggerOperation write(DfsLogger logger) throws Exception {
logger.defineTablet(commitSession.getWALogSeq(),
commitSession.getLogId(), commitSession.getExtent());
return DfsLogger.NO_WAIT_LOGGER_OP;
}
- });
+ }, writeRetry);
}
- public int log(final CommitSession commitSession, final long tabletSeq,
final Mutation m, final Durability durability) throws IOException {
+ public void log(final CommitSession commitSession, final long tabletSeq,
final Mutation m, final Durability durability) throws IOException {
if (durability == Durability.NONE) {
- return -1;
+ return;
}
if (durability == Durability.DEFAULT) {
throw new IllegalArgumentException("Unexpected durability " +
durability);
}
- int seq = write(commitSession, false, new Writer() {
+ write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(DfsLogger logger, int ignored) throws
Exception {
+ public LoggerOperation write(DfsLogger logger) throws Exception {
return logger.log(tabletSeq, commitSession.getLogId(), m, durability);
}
});
logSizeEstimate.addAndGet(m.numBytes());
- return seq;
}
- public int logManyTablets(Map<CommitSession,Mutations> mutations) throws
IOException {
+ public void logManyTablets(Map<CommitSession,Mutations> mutations) throws
IOException {
final Map<CommitSession,Mutations> loggables = new HashMap<>(mutations);
for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
@@ -497,11 +503,11 @@ public int logManyTablets(Map<CommitSession,Mutations>
mutations) throws IOExcep
}
}
if (loggables.size() == 0)
- return -1;
+ return;
- int seq = write(loggables.keySet(), false, new Writer() {
+ write(loggables.keySet(), false, new Writer() {
@Override
- public LoggerOperation write(DfsLogger logger, int ignored) throws
Exception {
+ public LoggerOperation write(DfsLogger logger) throws Exception {
List<TabletMutations> copy = new ArrayList<>(loggables.size());
for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) {
CommitSession cs = entry.getKey();
@@ -519,7 +525,6 @@ public LoggerOperation write(DfsLogger logger, int ignored)
throws Exception {
logSizeEstimate.addAndGet(m.numBytes());
}
}
- return seq;
}
public void minorCompactionFinished(final CommitSession commitSession, final
String fullyQualifiedFileName, final long walogSeq, final Durability durability)
@@ -527,23 +532,23 @@ public void minorCompactionFinished(final CommitSession
commitSession, final Str
long t1 = System.currentTimeMillis();
- int seq = write(commitSession, true, new Writer() {
+ write(commitSession, true, new Writer() {
@Override
- public LoggerOperation write(DfsLogger logger, int ignored) throws
Exception {
+ public LoggerOperation write(DfsLogger logger) throws Exception {
return logger.minorCompactionFinished(walogSeq,
commitSession.getLogId(), fullyQualifiedFileName, durability);
}
});
long t2 = System.currentTimeMillis();
- log.debug(" wrote MinC finish {}: writeTime:{}ms durability:{}", seq,
(t2 - t1), durability);
+ log.debug(" wrote MinC finish: writeTime:{}ms durability:{}", (t2 - t1),
durability);
}
public long minorCompactionStarted(final CommitSession commitSession, final
long seq, final String fullyQualifiedFileName, final Durability durability)
throws IOException {
write(commitSession, false, new Writer() {
@Override
- public LoggerOperation write(DfsLogger logger, int ignored) throws
Exception {
+ public LoggerOperation write(DfsLogger logger) throws Exception {
return logger.minorCompactionStarted(seq, commitSession.getLogId(),
fullyQualifiedFileName, durability);
}
});
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services