Author: vinodkv
Date: Tue Mar 4 17:34:01 2014
New Revision: 1574146
URL: http://svn.apache.org/r1574146
Log:
YARN-1730. Implemented simple write-locking in the LevelDB based
timeline-store. Contributed by Billie Rinaldi.
svn merge --ignore-ancestry -c 1574145 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1574146&r1=1574145&r2=1574146&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Mar 4
17:34:01 2014
@@ -234,6 +234,9 @@ Release 2.4.0 - UNRELEASED
YARN-1765. Added test cases to verify that killApplication API works across
ResourceManager failover. (Xuan Gong via vinodkv)
+ YARN-1730. Implemented simple write-locking in the LevelDB based timeline-
+ store. (Billie Rinaldi via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1574146&r1=1574145&r2=1574146&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
Tue Mar 4 17:34:01 2014
@@ -1073,9 +1073,22 @@ public class YarnConfiguration extends C
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
+ public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
+ TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
+
/** Timeline service leveldb path */
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
- TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.path";
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
+
+ /** Timeline service leveldb start time read cache (number of entities) */
+ public static final String
+ TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
+
+ /** Timeline service leveldb start time write cache (number of entities) */
+ public static final String
+ TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
////////////////////////////////
// Other Configs
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java?rev=1574146&r1=1574145&r2=1574146&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
Tue Mar 4 17:34:01 2014
@@ -33,6 +33,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.map.LRUMap;
@@ -84,11 +85,17 @@ public class LeveldbTimelineStore extend
private static final byte[] EMPTY_BYTES = new byte[0];
- private static final int START_TIME_CACHE_SIZE = 10000;
+ private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000;
+ private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000;
- @SuppressWarnings("unchecked")
- private final Map<EntityIdentifier, Long> startTimeCache =
- Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
+ private Map<EntityIdentifier, Long> startTimeWriteCache;
+ private Map<EntityIdentifier, Long> startTimeReadCache;
+
+ /**
+ * Per-entity locks are obtained when writing.
+ */
+ private final LockMap<EntityIdentifier> writeLocks =
+ new LockMap<EntityIdentifier>();
private DB db;
@@ -97,6 +104,7 @@ public class LeveldbTimelineStore extend
}
@Override
+ @SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception {
Options options = new Options();
options.createIfMissing(true);
@@ -109,6 +117,12 @@ public class LeveldbTimelineStore extend
"timeline store " + path);
LOG.info("Using leveldb path " + path);
db = factory.open(new File(path, FILENAME), options);
+ startTimeWriteCache =
+ Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
+ conf)));
+ startTimeReadCache =
+ Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
+ conf)));
super.serviceInit(conf);
}
@@ -118,6 +132,45 @@ public class LeveldbTimelineStore extend
super.serviceStop();
}
+ private static class LockMap<K> {
+ private static class CountingReentrantLock<K> extends ReentrantLock {
+ private int count;
+ private K key;
+
+ CountingReentrantLock(K key) {
+ super();
+ this.count = 0;
+ this.key = key;
+ }
+ }
+
+ private Map<K, CountingReentrantLock<K>> locks =
+ new HashMap<K, CountingReentrantLock<K>>();
+
+ synchronized CountingReentrantLock<K> getLock(K key) {
+ CountingReentrantLock<K> lock = locks.get(key);
+ if (lock == null) {
+ lock = new CountingReentrantLock<K>(key);
+ locks.put(key, lock);
+ }
+
+ lock.count++;
+ return lock;
+ }
+
+ synchronized void returnLock(CountingReentrantLock<K> lock) {
+ if (lock.count == 0) {
+ throw new IllegalStateException("Returned lock more times than it " +
+ "was retrieved");
+ }
+ lock.count--;
+
+ if (lock.count == 0) {
+ locks.remove(lock.key);
+ }
+ }
+ }
+
private static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
@@ -214,7 +267,7 @@ public class LeveldbTimelineStore extend
EnumSet<Field> fields) throws IOException {
DBIterator iterator = null;
try {
- byte[] revStartTime = getStartTime(entityId, entityType, null, null,
null);
+ byte[] revStartTime = getStartTime(entityId, entityType);
if (revStartTime == null)
return null;
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
@@ -338,7 +391,7 @@ public class LeveldbTimelineStore extend
// look up start times for the specified entities
// skip entities with no start time
for (String entity : entityIds) {
- byte[] startTime = getStartTime(entity, entityType, null, null, null);
+ byte[] startTime = getStartTime(entity, entityType);
if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) {
@@ -529,12 +582,16 @@ public class LeveldbTimelineStore extend
* response.
*/
private void put(TimelineEntity entity, TimelinePutResponse response) {
+ LockMap.CountingReentrantLock<EntityIdentifier> lock =
+ writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
+ entity.getEntityType()));
+ lock.lock();
WriteBatch writeBatch = null;
try {
writeBatch = db.createWriteBatch();
List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity
- byte[] revStartTime = getStartTime(entity.getEntityId(),
+ byte[] revStartTime = getAndSetStartTime(entity.getEntityId(),
entity.getEntityType(), entity.getStartTime(), events,
writeBatch);
if (revStartTime == null) {
@@ -571,7 +628,7 @@ public class LeveldbTimelineStore extend
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// look up start time of related entity
- byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+ byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId,
relatedEntityType, null, null, writeBatch);
if (relatedEntityStartTime == null) {
// if start time is not found, set start time of the related
@@ -580,7 +637,7 @@ public class LeveldbTimelineStore extend
relatedEntityStartTime = revStartTime;
writeBatch.put(createStartTimeLookupKey(relatedEntityId,
relatedEntityType), relatedEntityStartTime);
- startTimeCache.put(new EntityIdentifier(relatedEntityId,
+ startTimeWriteCache.put(new EntityIdentifier(relatedEntityId,
relatedEntityType), revStartTimeLong);
}
// write reverse entry (related entity -> entity)
@@ -629,6 +686,8 @@ public class LeveldbTimelineStore extend
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error);
} finally {
+ lock.unlock();
+ writeLocks.returnLock(lock);
IOUtils.cleanup(LOG, writeBatch);
}
}
@@ -666,6 +725,39 @@ public class LeveldbTimelineStore extend
*
* @param entityId The id of the entity
* @param entityType The type of the entity
+ * @return A byte array
+ * @throws IOException
+ */
+ private byte[] getStartTime(String entityId, String entityType)
+ throws IOException {
+ EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+ // start time is not provided, so try to look it up
+ if (startTimeReadCache.containsKey(entity)) {
+ // found the start time in the cache
+ return writeReverseOrderedLong(startTimeReadCache.get(entity));
+ } else {
+ // try to look up the start time in the db
+ byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+ byte[] v = db.get(b);
+ if (v == null) {
+ // did not find the start time in the db
+ return null;
+ } else {
+ // found the start time in the db
+ startTimeReadCache.put(entity, readReverseOrderedLong(v, 0));
+ return v;
+ }
+ }
+ }
+
+ /**
+ * Get the unique start time for a given entity as a byte array that sorts
+ * the timestamps in reverse order (see {@link
+ * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
+ * doesn't exist, set it based on the information provided.
+ *
+ * @param entityId The id of the entity
+ * @param entityType The type of the entity
* @param startTime The start time of the entity, or null
* @param events A list of events for the entity, or null
* @param writeBatch A leveldb write batch, if the method is called by a
@@ -673,62 +765,76 @@ public class LeveldbTimelineStore extend
* @return A byte array
* @throws IOException
*/
- private byte[] getStartTime(String entityId, String entityType,
+ private byte[] getAndSetStartTime(String entityId, String entityType,
Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) {
// start time is not provided, so try to look it up
- if (startTimeCache.containsKey(entity)) {
+ if (startTimeWriteCache.containsKey(entity)) {
// found the start time in the cache
- startTime = startTimeCache.get(entity);
+ startTime = startTimeWriteCache.get(entity);
+ return writeReverseOrderedLong(startTime);
} else {
- // try to look up the start time in the db
- byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
- byte[] v = db.get(b);
- if (v == null) {
- // did not find the start time in the db
- // if this is a put, try to set it from the provided events
- if (events == null || writeBatch == null) {
- // no events, or not a put, so return null
- return null;
- }
+ if (events != null) {
+ // prepare a start time from events in case it is needed
Long min = Long.MAX_VALUE;
- for (TimelineEvent e : events)
- if (min > e.getTimestamp())
+ for (TimelineEvent e : events) {
+ if (min > e.getTimestamp()) {
min = e.getTimestamp();
- startTime = min;
- // selected start time as minimum timestamp of provided events
- // write start time to db and cache
- writeBatch.put(b, writeReverseOrderedLong(startTime));
- startTimeCache.put(entity, startTime);
- } else {
- // found the start time in the db
- startTime = readReverseOrderedLong(v, 0);
- if (writeBatch != null) {
- // if this is a put, re-add the start time to the cache
- startTimeCache.put(entity, startTime);
+ }
}
+ startTime = min;
}
+ return checkStartTimeInDb(entity, startTime, writeBatch);
}
} else {
// start time is provided
- // TODO: verify start time in db as well as cache?
- if (startTimeCache.containsKey(entity)) {
- // if the start time is already in the cache,
- // and it is different from the provided start time,
- // use the one from the cache
- if (!startTime.equals(startTimeCache.get(entity)))
- startTime = startTimeCache.get(entity);
- } else if (writeBatch != null) {
- // if this is a put, write the provided start time to the db and the
- // cache
- byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
- writeBatch.put(b, writeReverseOrderedLong(startTime));
- startTimeCache.put(entity, startTime);
+ if (startTimeWriteCache.containsKey(entity)) {
+ // check the provided start time matches the cache
+ if (!startTime.equals(startTimeWriteCache.get(entity))) {
+ // the start time is already in the cache,
+ // and it is different from the provided start time,
+ // so use the one from the cache
+ startTime = startTimeWriteCache.get(entity);
+ }
+ return writeReverseOrderedLong(startTime);
+ } else {
+ // check the provided start time matches the db
+ return checkStartTimeInDb(entity, startTime, writeBatch);
}
}
- return writeReverseOrderedLong(startTime);
+ }
+
+ /**
+ * Checks db for start time and returns it if it exists. If it doesn't
+ * exist, writes the suggested start time (if it is not null). This is
+ * only called when the start time is not found in the cache,
+ * so it adds it back into the cache if it is found.
+ */
+ private byte[] checkStartTimeInDb(EntityIdentifier entity,
+ Long suggestedStartTime, WriteBatch writeBatch) throws IOException {
+ // create lookup key for start time
+ byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+ // retrieve value for key
+ byte[] v = db.get(b);
+ byte[] revStartTime;
+ if (v == null) {
+ // start time doesn't exist in db
+ if (suggestedStartTime == null) {
+ return null;
+ }
+ // write suggested start time
+ revStartTime = writeReverseOrderedLong(suggestedStartTime);
+ writeBatch.put(b, revStartTime);
+ } else {
+ // found start time in db, so ignore suggested start time
+ suggestedStartTime = readReverseOrderedLong(v, 0);
+ revStartTime = v;
+ }
+ startTimeWriteCache.put(entity, suggestedStartTime);
+ startTimeReadCache.put(entity, suggestedStartTime);
+ return revStartTime;
}
/**
@@ -868,6 +974,21 @@ public class LeveldbTimelineStore extend
*/
@VisibleForTesting
void clearStartTimeCache() {
- startTimeCache.clear();
+ startTimeWriteCache.clear();
+ startTimeReadCache.clear();
+ }
+
+ @VisibleForTesting
+ static int getStartTimeReadCacheSize(Configuration conf) {
+ return conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+ DEFAULT_START_TIME_READ_CACHE_SIZE);
+ }
+
+ @VisibleForTesting
+ static int getStartTimeWriteCacheSize(Configuration conf) {
+ return conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+ DEFAULT_START_TIME_WRITE_CACHE_SIZE);
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java?rev=1574146&r1=1574145&r2=1574146&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
Tue Mar 4 17:34:01 2014
@@ -30,6 +30,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestLeveldbTimelineStore
@@ -64,6 +66,7 @@ public class TestLeveldbTimelineStore
super.testGetSingleEntity();
((LeveldbTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
+ loadTestData();
}
@Test
@@ -86,4 +89,20 @@ public class TestLeveldbTimelineStore
super.testGetEvents();
}
+ @Test
+ public void testCacheSizes() {
+ Configuration conf = new Configuration();
+ assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
+ assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
+ conf.setInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+ 10001);
+ assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
+ conf = new Configuration();
+ conf.setInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+ 10002);
+ assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
+ }
+
}