jiajunwang commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334209387
 
 

 ##########
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##########
 @@ -202,126 +218,156 @@ public void disconnect() {
     if (!_zkClient.isClosed()) {
       _zkClient.close();
     }
-    if (!_znRecordClient.isClosed()) {
-      _znRecordClient.close();
-    }
   }
 
   private HelixProperty compressedBucketRead(String path) {
-    // TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-    // reads and writes
-    if (tryLock(path)) {
-      try {
-        // Retrieve the metadata
-        ZNRecord metadataRecord =
-            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-        if (metadataRecord == null) {
-          throw new ZkNoNodeException(
-              String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-        }
-
-        int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-        int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-        int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-        if (lastSuccessIndex == -1) {
-          throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-              LAST_SUCCESS_KEY, path));
-        }
-        if (bucketSize == -1) {
-          throw new HelixException(
-              String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-        }
-        if (dataSize == -1) {
-          throw new HelixException(
-              String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-        }
-
-        // Compute N - number of buckets
-        int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-        byte[] compressedRecord = new byte[dataSize];
-        String dataPath = path + "/" + lastSuccessIndex;
+    // 1. Get the version to read
+    byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+        null, AccessOption.PERSISTENT);
+    if (binaryVersionToRead == null) {
+      throw new ZkNoNodeException(
+          String.format("Last successful write ZNode does not exist for path: 
%s", path));
+    }
+    String versionToRead = new String(binaryVersionToRead);
+
+    // 2. Get the metadata map
+    byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+        null, AccessOption.PERSISTENT);
+    if (binaryMetadata == null) {
+      throw new ZkNoNodeException(
+          String.format("Metadata ZNode does not exist for path: %s", path));
+    }
+    Map metadata;
+    try {
+      metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+    } catch (IOException e) {
+      throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+    }
 
-        List<String> paths = new ArrayList<>();
-        for (int i = 0; i < numBuckets; i++) {
-          paths.add(dataPath + "/" + i);
-        }
+    // 3. Read the data
+    Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+    Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+    if (bucketSizeObj == null) {
+      throw new HelixException(
+          String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+    }
+    if (dataSizeObj == null) {
+      throw new HelixException(
+          String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+    }
+    int bucketSize = Integer.parseInt((String) bucketSizeObj);
+    int dataSize = Integer.parseInt((String) dataSizeObj);
 
-        // Async get
-        List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-        // Combine buckets into one byte array
-        int copyPtr = 0;
-        for (int i = 0; i < numBuckets; i++) {
-          if (i == numBuckets - 1) {
-            // Special treatment for the last bucket
-            System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-          } else {
-            System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-            copyPtr += bucketSize;
-          }
-        }
+    // Compute N - number of buckets
+    int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+    byte[] compressedRecord = new byte[dataSize];
+    String dataPath = path + "/" + versionToRead;
 
-        // Decompress the byte array
-        ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-        byte[] serializedRecord;
-        try {
-          serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-        } catch (IOException e) {
-          throw new HelixException(String.format("Failed to decompress path: 
%s!", path), e);
-        }
+    List<String> paths = new ArrayList<>();
+    for (int i = 0; i < numBuckets; i++) {
+      paths.add(dataPath + "/" + i);
+    }
 
-        // Deserialize the record to retrieve the original
-        ZNRecord originalRecord = (ZNRecord) 
_zkSerializer.deserialize(serializedRecord);
-        return new HelixProperty(originalRecord);
-      } finally {
-        // Critical section for read ends here
-        unlock(path);
+    // Async get
+    List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
+
+    // Combine buckets into one byte array
+    int copyPtr = 0;
+    for (int i = 0; i < numBuckets; i++) {
+      if (i == numBuckets - 1) {
+        // Special treatment for the last bucket
+        System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
+      } else {
+        System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
+        copyPtr += bucketSize;
       }
     }
-    throw new HelixException(String.format("Could not acquire lock for read. 
Path: %s", path));
+
+    // Decompress the byte array
+    ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
+    byte[] serializedRecord;
+    try {
+      serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
+    } catch (IOException e) {
+      throw new HelixException(String.format("Failed to decompress path: %s!", 
path), e);
+    }
+
+    // Deserialize the record to retrieve the original
+    ZNRecord originalRecord = (ZNRecord) 
_zkSerializer.deserialize(serializedRecord);
+    return new HelixProperty(originalRecord);
+  }
+
+  @Override
+  public void close() {
+    disconnect();
   }
 
   /**
-   * Returns the last string element in a split String array by /.
+   * Deletes all versions that are older than the currentVersion given.
    * @param path
-   * @return
+   * @param currentVersion
    */
-  private String extractIdFromPath(String path) {
-    String[] splitPath = path.split("/");
-    return splitPath[splitPath.length - 1];
+  private void deleteStaleVersions(String path, long currentVersion) {
+    // Get all children names under path
+    List<String> children = _zkBaseDataAccessor.getChildNames(path, 
AccessOption.PERSISTENT);
+    if (children == null || children.isEmpty()) {
+      // The whole path has been deleted so return immediately
+      return;
+    }
+    filterChildrenNames(children, currentVersion);
+    List<String> pathsToDelete = getPathsToDelete(path, children);
+    for (String pathToDelete : pathsToDelete) {
+      // TODO: Should be batch delete but it doesn't work. It's okay since 
this runs async
+      _zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
+    }
   }
 
   /**
-   * Acquires the lock (create an ephemeral node) only if it is free (no 
ephemeral node already
-   * exists) at the time of invocation.
-   * @param path
+   * Filter out currentVersion and non-version children names from the 
children list given.
+   * @param children
+   * @param currentVersion
+   */
+  private void filterChildrenNames(List<String> children, long currentVersion) 
{
+    // Leave out metadata
+    children.remove(LAST_SUCCESSFUL_WRITE_KEY);
+    children.remove(LAST_WRITE_KEY);
+
+    // Keep the children that are not expired and exclude currentVersion from 
deletion
+    long currTime = System.currentTimeMillis();
+    children.removeIf(childName -> currTime < extractTimestamp(childName) + 
_versionTTL
 
 Review comment:
   On more comments, this usage of versionTTL may leave some garbage 
uncollected if no new clean up is triggered. Why not just sleep in the thread 
for versionTTL long and then clean up everything before the current version?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to