Repository: hbase Updated Branches: refs/heads/HBASE-14070.HLC 386b1df1d -> d9a990490
HBASE-18498 Design improvements in Clock.java. - Delete PhysicalClock interface which seems useless give we have "System" implementation of Clock which is basically the same. - Embed systemMonotonic clock into HLC to get rid of redundancy in management of physical component, esp logic around ensuring monotonicity. - Make max_skew in clocks configurable - Remove isMonotonicallyIncreasing() which is not used. - update logical overflow test to not use hooks but prod code path - Added lots of comments. Change-Id: Ie775d28f15e864e7885b39e503c75670acbf4391 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d9a99049 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d9a99049 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d9a99049 Branch: refs/heads/HBASE-14070.HLC Commit: d9a9904907ef0cf450b76933750d045237713403 Parents: 386b1df Author: Apekshit Sharma <a...@apache.org> Authored: Fri Jul 21 20:21:22 2017 -0700 Committer: Apekshit Sharma <a...@apache.org> Committed: Wed Aug 2 15:14:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/Clock.java | 274 ++++++++----------- .../apache/hadoop/hbase/util/AtomicUtils.java | 1 + .../java/org/apache/hadoop/hbase/TestClock.java | 91 +++--- .../hbase/regionserver/HRegionServer.java | 6 +- .../hadoop/hbase/TestClockWithCluster.java | 79 +++--- 5 files changed, 210 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java index 0e2320f..6a0374e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java @@ -81,11 +81,6 @@ public interface Clock { boolean isMonotonic(); /** - * @return true if the clock implementation gives monotonically increasing timestamps else false. - */ - boolean isMonotonicallyIncreasing(); - - /** * @return {@link org.apache.hadoop.hbase.TimestampType} */ TimestampType getTimestampType(); @@ -95,6 +90,10 @@ public interface Clock { */ ClockType getClockType(); + /** + * @return {@link TimeUnit} of the physical time used by the clock. + */ + TimeUnit getTimeUnit(); /** * Indicates that Physical Time or Logical Time component has overflowed. This extends @@ -107,41 +106,7 @@ public interface Clock { } } - ////////////////////////////////////////////////////////////////// - // Physical Clock - ////////////////////////////////////////////////////////////////// - - interface PhysicalClock { - /** - * This is a method to get the current time. - * - * @return Timestamp of current time in 64 bit representation corresponding to the particular - * clock - */ - long now() throws RuntimeException; - - /** - * This is a method to get the unit of the physical time used by the clock - * - * @return A {@link TimeUnit} - */ - TimeUnit getTimeUnit(); - } - - class JavaMillisPhysicalClock implements PhysicalClock { - @Override - public long now() { - return EnvironmentEdgeManager.currentTime(); - } - - @Override - public TimeUnit getTimeUnit() { - return TimeUnit.MILLISECONDS; - } - } - - JavaMillisPhysicalClock DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK = - new JavaMillisPhysicalClock(); + Clock SYSTEM_CLOCK = new System(); ////////////////////////////////////////////////////////////////// // Implementation of clocks @@ -149,20 +114,18 @@ public interface Clock { /** * System clock is an implementation of clock which doesn't give any monotonic guarantees. + * Since it directly represents system's actual clock which cannot be changed, update() function + * is no-op. */ - class System implements Clock, PhysicalClock { - private final PhysicalClock physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK; - private final ClockType clockType = ClockType.SYSTEM; - private final TimestampType timestampType = TimestampType.PHYSICAL; - + class System implements Clock { @Override public long now() { - return physicalClock.now(); + return EnvironmentEdgeManager.currentTime(); } @Override public long update(long timestamp) { - return physicalClock.now(); + return EnvironmentEdgeManager.currentTime(); } @Override @@ -171,52 +134,62 @@ public interface Clock { } @Override - public boolean isMonotonicallyIncreasing() { - return false; - } - public TimeUnit getTimeUnit() { - return physicalClock.getTimeUnit(); + return TimeUnit.MILLISECONDS; } @Override - public TimestampType getTimestampType() { return timestampType; } + public TimestampType getTimestampType() { + return TimestampType.PHYSICAL; + } @Override - public ClockType getClockType() { return clockType; } + public ClockType getClockType() { + return ClockType.SYSTEM; + } } /** * System clock is an implementation of clock which guarantees monotonically non-decreasing * timestamps. */ - class SystemMonotonic implements Clock, PhysicalClock { + class SystemMonotonic implements Clock { private final long maxClockSkewInMs; - private final PhysicalClock physicalClock; + private final Clock systemClock; private final AtomicLong physicalTime = new AtomicLong(); - private final ClockType clockType = ClockType.SYSTEM_MONOTONIC; - private final TimestampType timestampType = TimestampType.PHYSICAL; - public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkewInMs) { - this.physicalClock = physicalClock; - this.maxClockSkewInMs = maxClockSkewInMs > 0 ? - maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS; + public SystemMonotonic(long maxClockSkewInMs) { + this(SYSTEM_CLOCK, maxClockSkewInMs); } + @VisibleForTesting public SystemMonotonic() { - this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK; - this.maxClockSkewInMs = DEFAULT_MAX_CLOCK_SKEW_IN_MS; + this(DEFAULT_MAX_CLOCK_SKEW_IN_MS); + } + + @VisibleForTesting + public SystemMonotonic(Clock systemClock) { + this(systemClock, DEFAULT_MAX_CLOCK_SKEW_IN_MS); + } + + @VisibleForTesting + public SystemMonotonic(Clock systemClock, long maxClockSkewInMs) { + this.systemClock = systemClock; + this.maxClockSkewInMs = maxClockSkewInMs > 0 ? + maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS; } @Override public long now() { - long systemTime = physicalClock.now(); - updateMax(physicalTime, systemTime); + updateMax(physicalTime, systemClock.now()); return physicalTime.get(); } + /** + * @throws ClockException If timestamp exceeds max clock skew allowed. + */ public long update(long targetTimestamp) throws ClockException { - final long systemTime = physicalClock.now(); + final long systemTime = systemClock.now(); if (maxClockSkewInMs > 0 && (targetTimestamp - systemTime) > maxClockSkewInMs) { throw new ClockException( "Received event with timestamp:" + getTimestampType().toString(targetTimestamp) @@ -233,107 +206,96 @@ public interface Clock { } @Override - public boolean isMonotonicallyIncreasing() { - return false; - } - public TimeUnit getTimeUnit() { - return physicalClock.getTimeUnit(); - } - - @VisibleForTesting - void setPhysicalTime(long time) { - physicalTime.set(time); + return systemClock.getTimeUnit(); } @Override - public TimestampType getTimestampType() { return timestampType; } + public TimestampType getTimestampType() { + return TimestampType.PHYSICAL; + } @Override - public ClockType getClockType() { return clockType; } + public ClockType getClockType() { + return ClockType.SYSTEM_MONOTONIC; + } } - class HLC implements Clock, PhysicalClock { - private final PhysicalClock physicalClock; - private final long maxClockSkew; - private final long maxPhysicalTime; - private final long maxLogicalTime; - private long physicalTime; - private long logicalTime; - private final ClockType clockType = ClockType.HLC; - private final TimestampType timestampType = TimestampType.HYBRID; - - public HLC(PhysicalClock physicalClock, long maxClockSkew) { - this.physicalClock = physicalClock; - this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW_IN_MS; - this.maxPhysicalTime = timestampType.getMaxPhysicalTime(); - this.maxLogicalTime = timestampType.getMaxLogicalTime(); - this.physicalTime = 0; - this.logicalTime = 0; + /** + * HLC clock implementation. + * Monotonicity guarantee of physical component of time comes from {@link SystemMonotonic} clock. + */ + class HLC implements Clock { + private static final TimestampType TIMESTAMP_TYPE = TimestampType.HYBRID; + private static final long MAX_PHYSICAL_TIME = TIMESTAMP_TYPE.getMaxPhysicalTime(); + private static final long MAX_LOGICAL_TIME = TIMESTAMP_TYPE.getMaxLogicalTime(); + private final Clock systemMonotonicClock; + private long currentPhysicalTime = 0; + private long currentLogicalTime = 0; + + public HLC(long maxClockSkewInMs) { + this(new SystemMonotonic(maxClockSkewInMs)); } + @VisibleForTesting public HLC() { - this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK; - this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW_IN_MS; - this.maxPhysicalTime = timestampType.getMaxPhysicalTime(); - this.maxLogicalTime = timestampType.getMaxLogicalTime(); - this.physicalTime = 0; - this.logicalTime = 0; + this(DEFAULT_MAX_CLOCK_SKEW_IN_MS); + } + + /** + * @param systemMonotonicClock Clock to get physical component of time. Should be monotonic + * clock. + */ + @VisibleForTesting + public HLC(Clock systemMonotonicClock) { + assert(systemMonotonicClock.isMonotonic()); + this.systemMonotonicClock = systemMonotonicClock; } @Override public synchronized long now() throws ClockException { - final long systemTime = physicalClock.now(); - - checkPhysicalTimeOverflow(systemTime, maxPhysicalTime); - checkLogicalTimeOverflow(logicalTime, maxLogicalTime); - - if (systemTime <= physicalTime) { - logicalTime++; - } else if (systemTime > physicalTime) { - logicalTime = 0; - physicalTime = systemTime; + final long newSystemTime = systemMonotonicClock.now(); + if (newSystemTime <= currentPhysicalTime) { + currentLogicalTime++; + } else if (newSystemTime > currentPhysicalTime) { + currentLogicalTime = 0; + currentPhysicalTime = newSystemTime; } + checkPhysicalTimeOverflow(newSystemTime, MAX_PHYSICAL_TIME); + checkLogicalTimeOverflow(currentLogicalTime, MAX_LOGICAL_TIME); return toTimestamp(); } /** - * Updates {@link HLC} with the given timestamp received from elsewhere (possibly - * some other node). Returned timestamp is strict greater than msgTimestamp and local - * timestamp. + * Updates {@link HLC} with the given time received from elsewhere (possibly some other node). * - * @param timestamp timestamp from the external message. - * @return a hybrid timestamp of HLC that is strictly greater than local timestamp and - * msgTimestamp - * @throws ClockException + * @param targetTime time from the external message. + * @return a hybrid timestamp of HLC that is strict greater than given {@code targetTime} and + * previously returned times. + * @throws ClockException If timestamp exceeds max clock skew allowed. */ @Override - public synchronized long update(long timestamp) + public synchronized long update(long targetTime) throws ClockException { - final long targetPhysicalTime = timestampType.getPhysicalTime(timestamp); - final long targetLogicalTime = timestampType.getLogicalTime(timestamp); - final long oldPhysicalTime = physicalTime; - final long systemTime = physicalClock.now(); - - physicalTime = Math.max(Math.max(oldPhysicalTime, targetPhysicalTime), systemTime); - checkPhysicalTimeOverflow(systemTime, maxPhysicalTime); - - if (targetPhysicalTime - systemTime > maxClockSkew) { - throw new ClockException("Received event with timestamp:" + - timestampType.toString(timestamp) + " which is greater than allowed clock skew"); - } - if (physicalTime == oldPhysicalTime && oldPhysicalTime == targetPhysicalTime) { - logicalTime = Math.max(logicalTime, targetLogicalTime) + 1; - } else if (physicalTime == targetPhysicalTime) { - logicalTime = targetLogicalTime + 1; - } else if (physicalTime == oldPhysicalTime) { - logicalTime++; + final long targetPhysicalTime = TIMESTAMP_TYPE.getPhysicalTime(targetTime); + final long targetLogicalTime = TIMESTAMP_TYPE.getLogicalTime(targetTime); + final long oldPhysicalTime = currentPhysicalTime; + currentPhysicalTime = systemMonotonicClock.update(targetPhysicalTime); + + checkPhysicalTimeOverflow(currentPhysicalTime, MAX_PHYSICAL_TIME); + + if (currentPhysicalTime == targetPhysicalTime && currentPhysicalTime == oldPhysicalTime) { + currentLogicalTime = Math.max(currentLogicalTime, targetLogicalTime) + 1; + } else if (currentPhysicalTime == targetPhysicalTime) { + currentLogicalTime = targetLogicalTime + 1; + } else if (currentPhysicalTime == oldPhysicalTime) { + currentLogicalTime++; } else { - logicalTime = 0; + currentLogicalTime = 0; } - checkLogicalTimeOverflow(logicalTime, maxLogicalTime); + checkLogicalTimeOverflow(currentLogicalTime, MAX_LOGICAL_TIME); return toTimestamp(); } @@ -342,40 +304,30 @@ public interface Clock { return true; } - @Override - public boolean isMonotonicallyIncreasing() { - return true; - } - public TimeUnit getTimeUnit() { - return physicalClock.getTimeUnit(); + return systemMonotonicClock.getTimeUnit(); } private long toTimestamp() { - return timestampType.toTimestamp(getTimeUnit(), physicalTime, logicalTime); - } - - @VisibleForTesting - synchronized void setLogicalTime(long logicalTime) { - this.logicalTime = logicalTime; + return TIMESTAMP_TYPE.toTimestamp(TimeUnit.MILLISECONDS, currentPhysicalTime, + currentLogicalTime); } @VisibleForTesting - synchronized void setPhysicalTime(long physicalTime) { - this.physicalTime = physicalTime; - } - - @VisibleForTesting - synchronized long getLogicalTime() { return logicalTime; } + synchronized long getLogicalTime() { return currentLogicalTime; } @VisibleForTesting - synchronized long getPhysicalTime() { return physicalTime; } + synchronized long getPhysicalTime() { return currentPhysicalTime; } @Override - public TimestampType getTimestampType() { return timestampType; } + public TimestampType getTimestampType() { + return TIMESTAMP_TYPE; + } @Override - public ClockType getClockType() { return clockType; } + public ClockType getClockType() { + return ClockType.HLC; + } } ////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java index 35391ee..cc32263 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java @@ -46,6 +46,7 @@ public class AtomicUtils { /** * Updates a AtomicLong which is supposed to maintain the maximum values. This method is not * synchronized but is thread-safe. + * @return true if {@code max} was updated with {@code value}, else false. */ public static void updateMax(AtomicLong max, long value) { while (true) { http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java index 2476c51..f9ad8e4 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java @@ -34,7 +34,7 @@ import static org.mockito.Mockito.*; @Category(SmallTests.class) public class TestClock { - static final Clock.PhysicalClock PHYSICAL_CLOCK = mock(Clock.PhysicalClock.class); + static final Clock MOCK_CLOCK = mock(Clock.class); static final long MAX_CLOCK_SKEW_IN_MS = 1000; @Rule @@ -84,7 +84,7 @@ public class TestClock { @Before public void setUp() { - when(PHYSICAL_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); + when(MOCK_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); } // All Clocks Tests @@ -92,40 +92,47 @@ public class TestClock { @Test public void testSystemMonotonicNow() { MonotonicityCheckerClock systemMonotonic = - new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false); + new MonotonicityCheckerClock(new Clock.SystemMonotonic(MOCK_CLOCK), false); // case 1: Set time and assert - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); assertEquals(100, systemMonotonic.now()); // case 2: Go back in time and check monotonic property. - when(PHYSICAL_CLOCK.now()).thenReturn(99L); + when(MOCK_CLOCK.now()).thenReturn(99L); assertEquals(100, systemMonotonic.now()); // case 3: system time goes ahead compared to previous timestamp. - when(PHYSICAL_CLOCK.now()).thenReturn(101L); + when(MOCK_CLOCK.now()).thenReturn(101L); assertEquals(101, systemMonotonic.now()); systemMonotonic.assertMonotonic(); } + /** + * Tests that + * - Progressing mock clock progresses SystemMonotonic clock. + * - Skews in the clock are correctly updated/not changed on call to update(), depending on + * target time and clock's own time ( = system time + current skew). + */ @Test public void testSystemMonotonicUpdate() { + Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(MOCK_CLOCK); MonotonicityCheckerClock systemMonotonic = - new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false); + new MonotonicityCheckerClock(systemMonotonicClock, false); // Sets internal time - when(PHYSICAL_CLOCK.now()).thenReturn(99L); + when(MOCK_CLOCK.now()).thenReturn(99L); assertEquals(99, systemMonotonic.now()); // case 1: Message timestamp is greater than current System Monotonic Time, // physical time at 100 still. - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); assertEquals(102, systemMonotonic.update(102)); // case 2: Message timestamp is greater than current System Monotonic Time, // physical time at 100 still. - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); assertEquals(103, systemMonotonic.update(103)); // case 3: Message timestamp is less than current System Monotonic Time, greater than current @@ -137,11 +144,11 @@ public class TestClock { assertEquals(103, systemMonotonic.update(99)); // case 5: Message timestamp<System monotonic time and both less than current Physical Time - when(PHYSICAL_CLOCK.now()).thenReturn(106L); + when(MOCK_CLOCK.now()).thenReturn(106L); assertEquals(106, systemMonotonic.update(102)); // case 6: Message timestamp>System monotonic time and both less than current Physical Time - when(PHYSICAL_CLOCK.now()).thenReturn(109L); + when(MOCK_CLOCK.now()).thenReturn(109L); assertEquals(109, systemMonotonic.update(108)); systemMonotonic.assertMonotonic(); @@ -151,10 +158,10 @@ public class TestClock { public void testSystemMonotonicUpdateMaxClockSkew() throws Clock.ClockException { final long time = 100L; Clock.SystemMonotonic systemMonotonic = - new Clock.SystemMonotonic(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS); + new Clock.SystemMonotonic(MOCK_CLOCK, MAX_CLOCK_SKEW_IN_MS); // Set Current Time. - when(PHYSICAL_CLOCK.now()).thenReturn(time); + when(MOCK_CLOCK.now()).thenReturn(time); systemMonotonic.now(); // Shouldn't throw ClockException @@ -173,28 +180,29 @@ public class TestClock { @Test public void testHLCNow() throws Clock.ClockException { - MonotonicityCheckerClock hybridLogicalClock = - new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 30000), true); + MonotonicityCheckerClock hybridLogicalClock = new MonotonicityCheckerClock( + new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)), + true); // true for strict monotonicity // case 1: Test if it returns correct time based on current physical time. // Remember, initially logical time = 0 - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); assertHLCTime(hybridLogicalClock.now(), 100, 0); // case 2: physical time doesn't change, logical time should increment. - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); assertHLCTime(hybridLogicalClock.now(), 100, 1); // case 3: physical time doesn't change still, logical time should increment again - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); assertHLCTime(hybridLogicalClock.now(), 100, 2); // case 4: physical time moves forward, logical time should reset to 0. - when(PHYSICAL_CLOCK.now()).thenReturn(101L); + when(MOCK_CLOCK.now()).thenReturn(101L); assertHLCTime(hybridLogicalClock.now(), 101, 0); // case 5: Monotonic increasing check, physical time goes back. - when(PHYSICAL_CLOCK.now()).thenReturn(99L); + when(MOCK_CLOCK.now()).thenReturn(99L); assertHLCTime(hybridLogicalClock.now(), 101, 1); hybridLogicalClock.assertMonotonic(); @@ -202,55 +210,57 @@ public class TestClock { @Test public void testHLCNowLogicalTimeOverFlow() { - Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100); - - // Set Current Time. - when(PHYSICAL_CLOCK.now()).thenReturn(100L); - hybridLogicalClock.setPhysicalTime(100); - hybridLogicalClock.setLogicalTime(TimestampType.HYBRID.getMaxLogicalTime()); + Clock.HLC hybridLogicalClock = new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)); + when(MOCK_CLOCK.now()).thenReturn(100L); + hybridLogicalClock.now(); // current time (100, 0) + for (int i = 0; i < TimestampType.HYBRID.getMaxLogicalTime() - 1; i++) { + hybridLogicalClock.now(); + } exception.expect(Clock.ClockException.class); hybridLogicalClock.now(); } + // No need to check skews in this test, since they are member of SystemMonotonic and not HLC. @Test public void testHLCUpdate() throws Clock.ClockException { long messageTimestamp; - MonotonicityCheckerClock hybridLogicalClock = - new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 100), true); + MonotonicityCheckerClock hybridLogicalClock = new MonotonicityCheckerClock( + new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)), + true); // true for strictly increasing check // Set Current Time. - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); hybridLogicalClock.now(); // case 1: Message physical timestamp is lower than current physical time. messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 99, 1); - when(PHYSICAL_CLOCK.now()).thenReturn(101L); + when(MOCK_CLOCK.now()).thenReturn(101L); assertHLCTime(hybridLogicalClock.update(messageTimestamp), 101, 0); // case 2: Message physical timestamp is greater than HLC physical time. messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 3); - when(PHYSICAL_CLOCK.now()).thenReturn(102L); + when(MOCK_CLOCK.now()).thenReturn(102L); assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 4); // case 3: Message timestamp is less than HLC timestamp messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 104 , 4); - when(PHYSICAL_CLOCK.now()).thenReturn(103L); + when(MOCK_CLOCK.now()).thenReturn(103L); assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 5); //case 4: Message timestamp with same physical time as HLC, but lower logical time messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 2); - when(PHYSICAL_CLOCK.now()).thenReturn(101L); + when(MOCK_CLOCK.now()).thenReturn(101L); assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 6); //case 5: Message timestamp with same physical time as HLC, but higher logical time messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 8); - when(PHYSICAL_CLOCK.now()).thenReturn(102L); + when(MOCK_CLOCK.now()).thenReturn(102L); assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 9); //case 6: Actual Physical Time greater than message physical timestamp and HLC physical time. messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 10); - when(PHYSICAL_CLOCK.now()).thenReturn(110L); + when(MOCK_CLOCK.now()).thenReturn(110L); assertHLCTime(hybridLogicalClock.update(messageTimestamp), 110, 0); hybridLogicalClock.assertMonotonic(); @@ -259,10 +269,10 @@ public class TestClock { @Test public void testHLCUpdateLogicalTimeOverFlow() throws Clock.ClockException { long messageTimestamp; - Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100); + Clock.HLC hybridLogicalClock = new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)); // Set Current Time. - when(PHYSICAL_CLOCK.now()).thenReturn(100L); + when(MOCK_CLOCK.now()).thenReturn(100L); hybridLogicalClock.now(); messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 100, @@ -275,10 +285,11 @@ public class TestClock { public void testHLCUpdateMaxClockSkew() throws Clock.ClockException { final long time = 100; long messageTimestamp; - Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS); + Clock.HLC hybridLogicalClock = new Clock.HLC( + new Clock.SystemMonotonic(MOCK_CLOCK, MAX_CLOCK_SKEW_IN_MS)); // Set Current Time. - when(PHYSICAL_CLOCK.now()).thenReturn(time); + when(MOCK_CLOCK.now()).thenReturn(time); hybridLogicalClock.now(); messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d8d87f8..7239a29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -598,8 +598,10 @@ public class HRegionServer extends HasThread implements this.abortRequested = false; this.stopped = false; - this.hybridLogicalClock = new Clock.HLC(); - this.systemMonotonicClock = new Clock.SystemMonotonic(); + final long maxClockSkew = + conf.getLong("hbase.max.clock.skew.in.ms", Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS); + this.hybridLogicalClock = new Clock.HLC(maxClockSkew); + this.systemMonotonicClock = new Clock.SystemMonotonic(maxClockSkew); this.systemClock = new Clock.System(); rpcServices = createRpcServices(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java index bc95f46..ea46090 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -198,18 +199,19 @@ public class TestClockWithCluster { assertNotNull(regionMeta); // Inject physical clock that always returns same physical time into hybrid logical clock - long systemTime = Clock.DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK.now(); - Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class); - when(physicalClock.now()).thenReturn(systemTime); - when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); - Clock.HLC clock = new Clock.HLC(physicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS); + long systemTime = Clock.SYSTEM_CLOCK.now(); + Clock mockSystemClock = mock(Clock.class); + when(mockSystemClock.now()).thenReturn(systemTime); + when(mockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); + when(mockSystemClock.isMonotonic()).thenReturn(true); + Clock.HLC masterHLC = new Clock.HLC(new Clock.SystemMonotonic(mockSystemClock)); // The region clock is used for setting timestamps for table mutations and the region server // clock is used for updating the clock on region assign/unassign events. // Set meta region clock so that region state transitions are timestamped with mocked clock - regionMeta.setClock(clock); - master.setClock(clock); + regionMeta.setClock(masterHLC); + master.setClock(masterHLC); HRegion userRegion = null; for (Region region : regions) { @@ -221,13 +223,13 @@ public class TestClockWithCluster { // Only mock the region server clock because the region clock does not get used during // unassignment and assignment - rs.setClock(clock); + rs.setClock(masterHLC); // Repeatedly unassign and assign region while tracking the timestamps of the region state // transitions from the meta table List<Long> timestamps = new ArrayList<>(); // Set expected logical time to 0 as initial clock.now() sets clock's logical time to 0 - long expectedLogicalTime = TimestampType.HYBRID.getLogicalTime(clock.now()); + long expectedLogicalTime = TimestampType.HYBRID.getLogicalTime(masterHLC.now()); for (int i = 0; i < 10; i++) { admin.unassign(hriOnline.getRegionName(), false); assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState()); @@ -243,8 +245,8 @@ public class TestClockWithCluster { // 8,9 [now] Update hbase:meta expectedLogicalTime += 10; - assertEquals(expectedLogicalTime, clock.getLogicalTime()); - timestamps.add(clock.getLogicalTime()); + assertEquals(expectedLogicalTime, masterHLC.getLogicalTime()); + timestamps.add(masterHLC.getLogicalTime()); admin.assign(hriOnline.getRegionName()); // clock.now() is called 7 times and clock.update() is called 2 times, each call increments @@ -260,8 +262,8 @@ public class TestClockWithCluster { // gets the region info from assignment manager rather than meta table accessor expectedLogicalTime += 9; assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState()); - assertEquals(expectedLogicalTime, clock.getLogicalTime()); - timestamps.add(clock.getLogicalTime()); + assertEquals(expectedLogicalTime, masterHLC.getLogicalTime()); + timestamps.add(masterHLC.getLogicalTime()); } // Ensure that the hybrid timestamps are strictly increasing @@ -305,62 +307,63 @@ public class TestClockWithCluster { assertNotNull(regionMeta); // Instantiate two hybrid logical clocks with mocked physical clocks - long expectedPhysicalTime = Clock.DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK.now(); - Clock.PhysicalClock masterPhysicalClock = mock(Clock.PhysicalClock.class); - when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime); - when(masterPhysicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); - Clock.HLC masterClock = new Clock.HLC(masterPhysicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS); - master.setClock(masterClock); - regionMeta.setClock(masterClock); - - Clock.PhysicalClock rsPhysicalClock = mock(Clock.PhysicalClock.class); - when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime); - when(rsPhysicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); - Clock.HLC rsClock = new Clock.HLC(rsPhysicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS); + long expectedPhysicalTime = Clock.SYSTEM_CLOCK.now(); + Clock masterMockSystemClock = mock(Clock.class); + when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime); + when(masterMockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); + Clock.HLC masterHLC= new Clock.HLC(new Clock.SystemMonotonic(masterMockSystemClock)); + master.setClock(masterHLC); + regionMeta.setClock(masterHLC); + + Clock rsMockSystemClock = mock(Clock.class); + when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime); + when(rsMockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); + when(rsMockSystemClock.isMonotonic()).thenReturn(true); + Clock.HLC rsHLC= new Clock.HLC(new Clock.SystemMonotonic(rsMockSystemClock)); // We only mock the region server clock here because the region clock does not get used // during unassignment and assignment - rs.setClock(rsClock); + rs.setClock(rsHLC); // Increment master physical clock time expectedPhysicalTime += 1000; - when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime); + when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime); // Unassign region, region server should advance its clock upon receiving close region request admin.unassign(hriOnline.getRegionName(), false); assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState()); // Verify that region server clock time increased // Previous test has explanation for each event that increases logical time - assertHLCTime(masterClock, expectedPhysicalTime, 9); - assertHLCTime(rsClock, expectedPhysicalTime, 6); + assertHLCTime(masterHLC, expectedPhysicalTime, 9); + assertHLCTime(rsHLC, expectedPhysicalTime, 6); // Increase region server physical clock time expectedPhysicalTime += 1000; - when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime); + when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime); // Assign region, master server should advance its clock upon receiving close region response admin.assign(hriOnline.getRegionName()); assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState()); // Verify that master clock time increased - assertHLCTime(masterClock, expectedPhysicalTime, 4); - assertHLCTime(rsClock, expectedPhysicalTime, 1); + assertHLCTime(masterHLC, expectedPhysicalTime, 4); + assertHLCTime(rsHLC, expectedPhysicalTime, 1); // Increment region server physical clock time expectedPhysicalTime += 1000; - when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime); + when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime); // Unassign region, region server should advance its clock upon receiving close region request admin.unassign(hriOnline.getRegionName(), false); assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState()); // Verify that master server clock time increased - assertHLCTime(masterClock, expectedPhysicalTime, 4); - assertHLCTime(rsClock, expectedPhysicalTime, 1); + assertHLCTime(masterHLC, expectedPhysicalTime, 4); + assertHLCTime(rsHLC, expectedPhysicalTime, 1); // Increase master server physical clock time expectedPhysicalTime += 1000; - when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime); + when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime); // Assign region, master server should advance its clock upon receiving close region response admin.assign(hriOnline.getRegionName()); assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState()); // Verify that region server clock time increased - assertHLCTime(masterClock, expectedPhysicalTime, 8); - assertHLCTime(rsClock, expectedPhysicalTime, 5); + assertHLCTime(masterHLC, expectedPhysicalTime, 8); + assertHLCTime(rsHLC, expectedPhysicalTime, 5); } } \ No newline at end of file