jiajunwang commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor URL: https://github.com/apache/helix/pull/506#discussion_r333759980
########## File path: helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java ########## @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws ZkMarshallingError { return data; } }); - _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient); - - // TODO: Optimize serialization with Jackson - // TODO: Or use a better binary serialization protocol - // TODO: Consider making this also binary - // TODO: Consider an async write for the metadata as well - _znRecordClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); - _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient); - _znRecordClient.setZkSerializer(new ZNRecordSerializer()); - + _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient); _zkSerializer = new ZNRecordJacksonSerializer(); _bucketSize = bucketSize; - _numVersions = numVersions; } /** * Constructor that uses a default bucket size. * @param zkAddr */ public ZkBucketDataAccessor(String zkAddr) { - this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS); + this(zkAddr, DEFAULT_BUCKET_SIZE); } @Override public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value) throws IOException { + AtomicLong versionRef = new AtomicLong(); + DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> { + if (dataInZk == null || dataInZk.length == 0) { + // No last write version exists, so start with 0 + return Longs.toByteArray(0); + } + // Last write exists, so increment and write it back + long lastWriteVersion = Longs.fromByteArray(dataInZk); + lastWriteVersion++; + // Set the AtomicReference + versionRef.set(lastWriteVersion); + return Longs.toByteArray(lastWriteVersion); + }; + + // 1. Increment lastWriteVersion using DataUpdater + if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, + AccessOption.PERSISTENT)) { + throw new HelixException( + String.format("Failed to write the write version at path: %s!", path)); + } + // Successfully reserved a version number + final long version = versionRef.get(); + + // 2. Write to the incremented last write version + String versionedDataPath = path + "/" + version; + // Take the ZNrecord and serialize it (get byte[]) byte[] serializedRecord = _zkSerializer.serialize(value.getRecord()); // Compress the byte[] byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord); // Compute N - number of buckets int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize; - if (tryLock(path)) { - try { - // Read or initialize metadata and compute the last success version index - ZNRecord metadataRecord = - _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); - if (metadataRecord == null) { - metadataRecord = new ZNRecord(extractIdFromPath(path)); - } - int lastSuccessIndex = - (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions; - String dataPath = path + "/" + lastSuccessIndex; - - List<String> paths = new ArrayList<>(); - List<Object> buckets = new ArrayList<>(); - - int ptr = 0; - int counter = 0; - while (counter < numBuckets) { - paths.add(dataPath + "/" + counter); - if (counter == numBuckets - 1) { - // Special treatment for the last bucket - buckets.add(Arrays.copyOfRange(compressedRecord, ptr, - ptr + compressedRecord.length % _bucketSize)); - } else { - buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize)); - } - ptr += _bucketSize; - counter++; - } + List<String> paths = new ArrayList<>(); + List<byte[]> buckets = new ArrayList<>(); + + int ptr = 0; + int counter = 0; + while (counter < numBuckets) { + paths.add(versionedDataPath + "/" + counter); + if (counter == numBuckets - 1) { + // Special treatment for the last bucket + buckets.add( + Arrays.copyOfRange(compressedRecord, ptr, ptr + compressedRecord.length % _bucketSize)); + } else { + buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize)); + } + ptr += _bucketSize; + counter++; + } - // Do a cleanup of previous data - if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) { - // Clean-up is not critical so upon failure, we log instead of throwing an exception - LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath); - } + // Do an async set to ZK + boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT); + // Exception and fail the write if any failed + for (boolean s : success) { + if (!s) { + throw new HelixException( + String.format("Failed to write the data buckets for path: %s", path)); + } + } - // Do an async set to ZK - boolean[] success = - _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT); - // Exception and fail the write if any failed - for (boolean s : success) { - if (!s) { - throw new HelixException( - String.format("Failed to write the data buckets for path: %s", path)); - } - } + // 3. Data write succeeded, so write the metadata by first serializing to byte array + Map<String, String> metadata = ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(_bucketSize), + DATA_SIZE_KEY, Integer.toString(compressedRecord.length)); + byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata); + if (!_zkBaseDataAccessor.set(path + "/" + version + "/" + METADATA_KEY, binaryMetadata, + AccessOption.PERSISTENT)) { + throw new HelixException(String.format("Failed to write the metadata at path: %s!", path)); + } - // Data write completed, so update the metadata with last success index - // Note that the metadata ZNodes is written using sync write - metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize); - metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length); - metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex); - if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) { - throw new HelixException( - String.format("Failed to write the metadata at path: %s!", path)); - } - } finally { - // Critical section for write ends here - unlock(path); + // 4. Update lastSuccessfulWriteVersion using Updater + DataUpdater<byte[]> lastSuccessfulWriteVersionUpdater = dataInZk -> { + if (dataInZk == null || dataInZk.length == 0) { + // No last write version exists, so write version from this write + return Longs.toByteArray(version); } - return true; + // Last successful write exists so check if it's smaller than my number + long lastWriteVersion = Longs.fromByteArray(dataInZk); + if (lastWriteVersion < version) { + // Smaller, so I can overwrite + return Longs.toByteArray(version); + } else { + // Greater, I have lagged behind. Return the existing data + return dataInZk; Review comment: One way to avoid unnecessary write here is returning null. So the _zkBaseDataAccessor won't trigger a real write. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org