narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333777205
 
 

 ##########
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##########
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
         return data;
       }
     });
-    _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-    // TODO: Optimize serialization with Jackson
-    // TODO: Or use a better binary serialization protocol
-    // TODO: Consider making this also binary
-    // TODO: Consider an async write for the metadata as well
-    _znRecordClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-    _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-    _znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+    _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
-    _numVersions = numVersions;
   }
 
   /**
    * Constructor that uses a default bucket size.
    * @param zkAddr
    */
   public ZkBucketDataAccessor(String zkAddr) {
-    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+    this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public <T extends HelixProperty> boolean compressedBucketWrite(String path, 
T value)
       throws IOException {
+    AtomicLong versionRef = new AtomicLong();
+    DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> {
+      if (dataInZk == null || dataInZk.length == 0) {
+        // No last write version exists, so start with 0
+        return Longs.toByteArray(0);
+      }
+      // Last write exists, so increment and write it back
+      long lastWriteVersion = Longs.fromByteArray(dataInZk);
+      lastWriteVersion++;
+      // Set the AtomicReference
+      versionRef.set(lastWriteVersion);
+      return Longs.toByteArray(lastWriteVersion);
+    };
+
+    // 1. Increment lastWriteVersion using DataUpdater
+    if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, 
lastWriteVersionUpdater,
+        AccessOption.PERSISTENT)) {
+      throw new HelixException(
+          String.format("Failed to write the write version at path: %s!", 
path));
+    }
+    // Successfully reserved a version number
+    final long version = versionRef.get();
+
+    // 2. Write to the incremented last write version
+    String versionedDataPath = path + "/" + version;
+
     // Take the ZNrecord and serialize it (get byte[])
     byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
     // Compress the byte[]
     byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
     // Compute N - number of buckets
     int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-    if (tryLock(path)) {
-      try {
-        // Read or initialize metadata and compute the last success version 
index
-        ZNRecord metadataRecord =
-            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-        if (metadataRecord == null) {
-          metadataRecord = new ZNRecord(extractIdFromPath(path));
-        }
-        int lastSuccessIndex =
-            (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-        String dataPath = path + "/" + lastSuccessIndex;
-
-        List<String> paths = new ArrayList<>();
-        List<Object> buckets = new ArrayList<>();
-
-        int ptr = 0;
-        int counter = 0;
-        while (counter < numBuckets) {
-          paths.add(dataPath + "/" + counter);
-          if (counter == numBuckets - 1) {
-            // Special treatment for the last bucket
-            buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-                ptr + compressedRecord.length % _bucketSize));
-          } else {
-            buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-          }
-          ptr += _bucketSize;
-          counter++;
-        }
+    List<String> paths = new ArrayList<>();
+    List<byte[]> buckets = new ArrayList<>();
+
+    int ptr = 0;
+    int counter = 0;
+    while (counter < numBuckets) {
+      paths.add(versionedDataPath + "/" + counter);
+      if (counter == numBuckets - 1) {
+        // Special treatment for the last bucket
+        buckets.add(
+            Arrays.copyOfRange(compressedRecord, ptr, ptr + 
compressedRecord.length % _bucketSize));
+      } else {
+        buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
+      }
+      ptr += _bucketSize;
+      counter++;
+    }
 
-        // Do a cleanup of previous data
-        if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-          // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-          LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", dataPath);
-        }
+    // Do an async set to ZK
+    boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, 
AccessOption.PERSISTENT);
+    // Exception and fail the write if any failed
+    for (boolean s : success) {
+      if (!s) {
+        throw new HelixException(
+            String.format("Failed to write the data buckets for path: %s", 
path));
+      }
+    }
 
-        // Do an async set to ZK
-        boolean[] success =
-            _zkBaseDataAccessor.setChildren(paths, buckets, 
AccessOption.PERSISTENT);
-        // Exception and fail the write if any failed
-        for (boolean s : success) {
-          if (!s) {
-            throw new HelixException(
-                String.format("Failed to write the data buckets for path: %s", 
path));
-          }
-        }
+    // 3. Data write succeeded, so write the metadata by first serializing to 
byte array
+    Map<String, String> metadata = ImmutableMap.of(BUCKET_SIZE_KEY, 
Integer.toString(_bucketSize),
+        DATA_SIZE_KEY, Integer.toString(compressedRecord.length));
+    byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata);
+    if (!_zkBaseDataAccessor.set(path + "/" + version + "/" + METADATA_KEY, 
binaryMetadata,
+        AccessOption.PERSISTENT)) {
+      throw new HelixException(String.format("Failed to write the metadata at 
path: %s!", path));
+    }
 
-        // Data write completed, so update the metadata with last success index
-        // Note that the metadata ZNodes is written using sync write
-        metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize);
-        metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length);
-        metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex);
-        if (!_znRecordBaseDataAccessor.set(path, metadataRecord, 
AccessOption.PERSISTENT)) {
-          throw new HelixException(
-              String.format("Failed to write the metadata at path: %s!", 
path));
-        }
-      } finally {
-        // Critical section for write ends here
-        unlock(path);
+    // 4. Update lastSuccessfulWriteVersion using Updater
+    DataUpdater<byte[]> lastSuccessfulWriteVersionUpdater = dataInZk -> {
+      if (dataInZk == null || dataInZk.length == 0) {
+        // No last write version exists, so write version from this write
+        return Longs.toByteArray(version);
       }
-      return true;
+      // Last successful write exists so check if it's smaller than my number
+      long lastWriteVersion = Longs.fromByteArray(dataInZk);
+      if (lastWriteVersion < version) {
+        // Smaller, so I can overwrite
+        return Longs.toByteArray(version);
+      } else {
+        // Greater, I have lagged behind. Return the existing data
+        return dataInZk;
+      }
+    };
+    if (!_zkBaseDataAccessor.update(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
+        lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) {
+      throw new HelixException(
+          String.format("Failed to write the last successful write metadata at 
path: %s!", path));
     }
-    throw new HelixException(String.format("Could not acquire lock for write. 
Path: %s", path));
+
+    // 5. Garbage collect stale versions
+    new Thread(() -> deleteStaleVersions(path, 
Long.toString(version))).start();
 
 Review comment:
   Added a thread pool of size 1. Might have to bound the size of the queue but 
I'm not too worried about that for now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to